From 21f3ef540f6f492993ab98740154ca4c327e373e Mon Sep 17 00:00:00 2001 From: Rohit Malhotra Date: Sun, 31 Aug 2025 16:09:41 -0400 Subject: [PATCH] refactor: Apply GitHub mixins pattern to BitBucket service (#10728) Co-authored-by: openhands --- .../bitbucket/bitbucket_service.py | 726 +----------------- .../bitbucket/service/__init__.py | 13 + .../integrations/bitbucket/service/base.py | 248 ++++++ .../bitbucket/service/branches.py | 116 +++ .../bitbucket/service/features.py | 45 ++ .../integrations/bitbucket/service/prs.py | 100 +++ .../integrations/bitbucket/service/repos.py | 245 ++++++ 7 files changed, 782 insertions(+), 711 deletions(-) create mode 100644 openhands/integrations/bitbucket/service/__init__.py create mode 100644 openhands/integrations/bitbucket/service/base.py create mode 100644 openhands/integrations/bitbucket/service/branches.py create mode 100644 openhands/integrations/bitbucket/service/features.py create mode 100644 openhands/integrations/bitbucket/service/prs.py create mode 100644 openhands/integrations/bitbucket/service/repos.py diff --git a/openhands/integrations/bitbucket/bitbucket_service.py b/openhands/integrations/bitbucket/bitbucket_service.py index d2f5908566..da9de96873 100644 --- a/openhands/integrations/bitbucket/bitbucket_service.py +++ b/openhands/integrations/bitbucket/bitbucket_service.py @@ -1,32 +1,29 @@ -import base64 import os -import re -from typing import Any -import httpx from pydantic import SecretStr -from openhands.core.logger import openhands_logger as logger +from openhands.integrations.bitbucket.service import ( + BitBucketBranchesMixin, + BitBucketFeaturesMixin, + BitBucketPRsMixin, + BitBucketReposMixin, +) from openhands.integrations.service_types import ( - BaseGitService, - Branch, GitService, InstallationsService, - OwnerType, - PaginatedBranchesResponse, ProviderType, - Repository, - RequestMethod, - ResourceNotFoundError, - SuggestedTask, - User, ) -from openhands.microagent.types import MicroagentContentResponse -from openhands.server.types import AppMode from openhands.utils.import_utils import get_impl -class BitBucketService(BaseGitService, GitService, InstallationsService): +class BitBucketService( + BitBucketReposMixin, + BitBucketBranchesMixin, + BitBucketPRsMixin, + BitBucketFeaturesMixin, + GitService, + InstallationsService, +): """Default implementation of GitService for Bitbucket integration. This is an extension point in OpenHands that allows applications to customize Bitbucket @@ -38,10 +35,6 @@ class BitBucketService(BaseGitService, GitService, InstallationsService): The class is instantiated via get_impl() in openhands.server.shared.py. """ - BASE_URL = 'https://api.bitbucket.org/2.0' - token: SecretStr = SecretStr('') - refresh = False - def __init__( self, user_id: str | None = None, @@ -50,7 +43,7 @@ class BitBucketService(BaseGitService, GitService, InstallationsService): token: SecretStr | None = None, external_token_manager: bool = False, base_domain: str | None = None, - ): + ) -> None: self.user_id = user_id self.external_token_manager = external_token_manager self.external_auth_id = external_auth_id @@ -66,695 +59,6 @@ class BitBucketService(BaseGitService, GitService, InstallationsService): def provider(self) -> str: return ProviderType.BITBUCKET.value - def _extract_owner_and_repo(self, repository: str) -> tuple[str, str]: - """Extract owner and repo from repository string. - - Args: - repository: Repository name in format 'workspace/repo_slug' - - Returns: - Tuple of (owner, repo) - - Raises: - ValueError: If repository format is invalid - """ - parts = repository.split('/') - if len(parts) < 2: - raise ValueError(f'Invalid repository name: {repository}') - - return parts[-2], parts[-1] - - async def get_latest_token(self) -> SecretStr | None: - """Get latest working token of the user.""" - return self.token - - async def _get_cursorrules_url(self, repository: str) -> str: - """Get the URL for checking .cursorrules file.""" - # Get repository details to get the main branch - repo_details = await self.get_repository_details_from_repo_name(repository) - if not repo_details.main_branch: - raise ResourceNotFoundError( - f'Main branch not found for repository {repository}. ' - f'This repository may be empty or have no default branch configured.' - ) - return f'{self.BASE_URL}/repositories/{repository}/src/{repo_details.main_branch}/.cursorrules' - - async def _get_microagents_directory_url( - self, repository: str, microagents_path: str - ) -> str: - """Get the URL for checking microagents directory.""" - # Get repository details to get the main branch - repo_details = await self.get_repository_details_from_repo_name(repository) - if not repo_details.main_branch: - raise ResourceNotFoundError( - f'Main branch not found for repository {repository}. ' - f'This repository may be empty or have no default branch configured.' - ) - return f'{self.BASE_URL}/repositories/{repository}/src/{repo_details.main_branch}/{microagents_path}' - - def _get_microagents_directory_params(self, microagents_path: str) -> dict | None: - """Get parameters for the microagents directory request. Return None if no parameters needed.""" - return None - - def _is_valid_microagent_file(self, item: dict) -> bool: - """Check if an item represents a valid microagent file.""" - return ( - item['type'] == 'commit_file' - and item['path'].endswith('.md') - and not item['path'].endswith('README.md') - ) - - def _get_file_name_from_item(self, item: dict) -> str: - """Extract file name from directory item.""" - return item['path'].split('/')[-1] - - def _get_file_path_from_item(self, item: dict, microagents_path: str) -> str: - """Extract file path from directory item.""" - return item['path'] - - def _has_token_expired(self, status_code: int) -> bool: - return status_code == 401 - - async def _get_bitbucket_headers(self) -> dict[str, str]: - """Get headers for Bitbucket API requests.""" - token_value = self.token.get_secret_value() - - # Check if the token contains a colon, which indicates it's in username:password format - if ':' in token_value: - auth_str = base64.b64encode(token_value.encode()).decode() - return { - 'Authorization': f'Basic {auth_str}', - 'Accept': 'application/json', - } - else: - return { - 'Authorization': f'Bearer {token_value}', - 'Accept': 'application/json', - } - - async def _make_request( - self, - url: str, - params: dict | None = None, - method: RequestMethod = RequestMethod.GET, - ) -> tuple[Any, dict]: - """Make a request to the Bitbucket API. - - Args: - url: The URL to request - params: Optional parameters for the request - method: The HTTP method to use - - Returns: - A tuple of (response_data, response_headers) - - """ - try: - async with httpx.AsyncClient() as client: - bitbucket_headers = await self._get_bitbucket_headers() - response = await self.execute_request( - client, url, bitbucket_headers, params, method - ) - if self.refresh and self._has_token_expired(response.status_code): - await self.get_latest_token() - bitbucket_headers = await self._get_bitbucket_headers() - response = await self.execute_request( - client=client, - url=url, - headers=bitbucket_headers, - params=params, - method=method, - ) - response.raise_for_status() - return response.json(), dict(response.headers) - except httpx.HTTPStatusError as e: - raise self.handle_http_status_error(e) - except httpx.HTTPError as e: - raise self.handle_http_error(e) - - async def get_user(self) -> User: - """Get the authenticated user's information.""" - url = f'{self.BASE_URL}/user' - data, _ = await self._make_request(url) - - account_id = data.get('account_id', '') - - return User( - id=account_id, - login=data.get('username', ''), - avatar_url=data.get('links', {}).get('avatar', {}).get('href', ''), - name=data.get('display_name'), - email=None, # Bitbucket API doesn't return email in this endpoint - ) - - def _parse_repository( - self, repo: dict, link_header: str | None = None - ) -> Repository: - """Parse a Bitbucket API repository response into a Repository object. - - Args: - repo: Repository data from Bitbucket API - link_header: Optional link header for pagination - - Returns: - Repository object - """ - repo_id = repo.get('uuid', '') - - workspace_slug = repo.get('workspace', {}).get('slug', '') - repo_slug = repo.get('slug', '') - full_name = ( - f'{workspace_slug}/{repo_slug}' if workspace_slug and repo_slug else '' - ) - - is_public = not repo.get('is_private', True) - owner_type = OwnerType.ORGANIZATION - main_branch = repo.get('mainbranch', {}).get('name') - - return Repository( - id=repo_id, - full_name=full_name, # type: ignore[arg-type] - git_provider=ProviderType.BITBUCKET, - is_public=is_public, - stargazers_count=None, # Bitbucket doesn't have stars - pushed_at=repo.get('updated_on'), - owner_type=owner_type, - link_header=link_header, - main_branch=main_branch, - ) - - async def search_repositories( - self, query: str, per_page: int, sort: str, order: str, public: bool - ) -> list[Repository]: - """Search for repositories.""" - repositories = [] - - if public: - # Extract workspace and repo from URL - # URL format: https://{domain}/{workspace}/{repo}/{additional_params} - # Split by '/' and find workspace and repo parts - url_parts = query.split('/') - if len(url_parts) >= 5: # https:, '', domain, workspace, repo - workspace_slug = url_parts[3] - repo_name = url_parts[4] - - repo = await self.get_repository_details_from_repo_name( - f'{workspace_slug}/{repo_name}' - ) - repositories.append(repo) - - return repositories - - # Search for repos once workspace prefix exists - if '/' in query: - workspace_slug, repo_query = query.split('/', 1) - return await self.get_paginated_repos( - 1, per_page, sort, workspace_slug, repo_query - ) - - all_installations = await self.get_installations() - - # Workspace prefix isn't complete. Search workspace names and repos underneath each workspace - matching_workspace_slugs = [ - installation for installation in all_installations if query in installation - ] - for workspace_slug in matching_workspace_slugs: - # Get repositories where query matches workspace name - try: - repos = await self.get_paginated_repos( - 1, per_page, sort, workspace_slug - ) - repositories.extend(repos) - except Exception: - continue - - for workspace_slug in all_installations: - # Get repositories in all workspaces where query matches repo name - try: - repos = await self.get_paginated_repos( - 1, per_page, sort, workspace_slug, query - ) - repositories.extend(repos) - except Exception: - continue - - return repositories - - async def _get_user_workspaces(self) -> list[dict[str, Any]]: - """Get all workspaces the user has access to""" - url = f'{self.BASE_URL}/workspaces' - data, _ = await self._make_request(url) - return data.get('values', []) - - async def _fetch_paginated_data( - self, url: str, params: dict, max_items: int - ) -> list[dict]: - """Fetch data with pagination support for Bitbucket API. - - Args: - url: The API endpoint URL - params: Query parameters for the request - max_items: Maximum number of items to fetch - - Returns: - List of data items from all pages - """ - all_items: list[dict] = [] - current_url = url - - while current_url and len(all_items) < max_items: - response, _ = await self._make_request(current_url, params) - - # Extract items from response - page_items = response.get('values', []) - if not page_items: # No more items - break - - all_items.extend(page_items) - - # Get the next page URL from the response - current_url = response.get('next') - - # Clear params for subsequent requests since the next URL already contains all parameters - params = {} - - return all_items[:max_items] # Trim to max_items if needed - - async def get_installations( - self, query: str | None = None, limit: int = 100 - ) -> list[str]: - workspaces_url = f'{self.BASE_URL}/workspaces' - params = {} - if query: - params['q'] = f'name~"{query}"' - - workspaces = await self._fetch_paginated_data(workspaces_url, params, limit) - - installations: list[str] = [] - for workspace in workspaces: - installations.append(workspace['slug']) - - return installations - - async def get_paginated_repos( - self, - page: int, - per_page: int, - sort: str, - installation_id: str | None, - query: str | None = None, - ) -> list[Repository]: - """Get paginated repositories for a specific workspace. - - Args: - page: The page number to fetch - per_page: The number of repositories per page - sort: The sort field ('pushed', 'updated', 'created', 'full_name') - installation_id: The workspace slug to fetch repositories from (as int, will be converted to string) - - Returns: - A list of Repository objects - """ - if not installation_id: - return [] - - # Convert installation_id to string for use as workspace_slug - workspace_slug = installation_id - workspace_repos_url = f'{self.BASE_URL}/repositories/{workspace_slug}' - - # Map sort parameter to Bitbucket API compatible values - bitbucket_sort = sort - if sort == 'pushed': - # Bitbucket doesn't support 'pushed', use 'updated_on' instead - bitbucket_sort = '-updated_on' # Use negative prefix for descending order - elif sort == 'updated': - bitbucket_sort = '-updated_on' - elif sort == 'created': - bitbucket_sort = '-created_on' - elif sort == 'full_name': - bitbucket_sort = 'name' # Bitbucket uses 'name' not 'full_name' - else: - # Default to most recently updated first - bitbucket_sort = '-updated_on' - - params = { - 'pagelen': per_page, - 'page': page, - 'sort': bitbucket_sort, - } - - if query: - params['q'] = f'name~"{query}"' - - response, headers = await self._make_request(workspace_repos_url, params) - - # Extract repositories from the response - repos = response.get('values', []) - - # Extract next URL from response - next_link = response.get('next', '') - - # Format the link header in a way that the frontend can understand - # The frontend expects a format like: ; rel="next" - # where the URL contains a page parameter - formatted_link_header = '' - if next_link: - # Extract the page number from the next URL if possible - page_match = re.search(r'[?&]page=(\d+)', next_link) - if page_match: - next_page = page_match.group(1) - # Format it in a way that extractNextPageFromLink in frontend can parse - formatted_link_header = ( - f'<{workspace_repos_url}?page={next_page}>; rel="next"' - ) - else: - # If we can't extract the page, just use the next URL as is - formatted_link_header = f'<{next_link}>; rel="next"' - - repositories = [ - self._parse_repository(repo, link_header=formatted_link_header) - for repo in repos - ] - - return repositories - - async def get_all_repositories( - self, sort: str, app_mode: AppMode - ) -> list[Repository]: - """Get repositories for the authenticated user using workspaces endpoint. - - This method gets all repositories (both public and private) that the user has access to - by iterating through their workspaces and fetching repositories from each workspace. - This approach is more comprehensive and efficient than the previous implementation - that made separate calls for public and private repositories. - """ - MAX_REPOS = 1000 - PER_PAGE = 100 # Maximum allowed by Bitbucket API - repositories: list[Repository] = [] - - # Get user's workspaces with pagination - workspaces_url = f'{self.BASE_URL}/workspaces' - workspaces = await self._fetch_paginated_data(workspaces_url, {}, MAX_REPOS) - - for workspace in workspaces: - workspace_slug = workspace.get('slug') - if not workspace_slug: - continue - - # Get repositories for this workspace with pagination - workspace_repos_url = f'{self.BASE_URL}/repositories/{workspace_slug}' - - # Map sort parameter to Bitbucket API compatible values and ensure descending order - # to show most recently changed repos at the top - bitbucket_sort = sort - if sort == 'pushed': - # Bitbucket doesn't support 'pushed', use 'updated_on' instead - bitbucket_sort = ( - '-updated_on' # Use negative prefix for descending order - ) - elif sort == 'updated': - bitbucket_sort = '-updated_on' - elif sort == 'created': - bitbucket_sort = '-created_on' - elif sort == 'full_name': - bitbucket_sort = 'name' # Bitbucket uses 'name' not 'full_name' - else: - # Default to most recently updated first - bitbucket_sort = '-updated_on' - - params = { - 'pagelen': PER_PAGE, - 'sort': bitbucket_sort, - } - - # Fetch all repositories for this workspace with pagination - workspace_repos = await self._fetch_paginated_data( - workspace_repos_url, params, MAX_REPOS - len(repositories) - ) - - for repo in workspace_repos: - repositories.append(self._parse_repository(repo)) - - # Stop if we've reached the maximum number of repositories - if len(repositories) >= MAX_REPOS: - break - - # Stop if we've reached the maximum number of repositories - if len(repositories) >= MAX_REPOS: - break - - return repositories - - async def get_suggested_tasks(self) -> list[SuggestedTask]: - """Get suggested tasks for the authenticated user across all repositories.""" - # TODO: implemented suggested tasks - return [] - - async def get_repository_details_from_repo_name( - self, repository: str - ) -> Repository: - """Gets all repository details from repository name.""" - owner, repo = self._extract_owner_and_repo(repository) - - url = f'{self.BASE_URL}/repositories/{owner}/{repo}' - data, _ = await self._make_request(url) - - return self._parse_repository(data) - - async def get_branches(self, repository: str) -> list[Branch]: - """Get branches for a repository.""" - owner, repo = self._extract_owner_and_repo(repository) - - url = f'{self.BASE_URL}/repositories/{owner}/{repo}/refs/branches' - - # Set maximum branches to fetch (similar to GitHub/GitLab implementations) - MAX_BRANCHES = 1000 - PER_PAGE = 100 - - params = { - 'pagelen': PER_PAGE, - 'sort': '-target.date', # Sort by most recent commit date, descending - } - - # Fetch all branches with pagination - branch_data = await self._fetch_paginated_data(url, params, MAX_BRANCHES) - - branches = [] - for branch in branch_data: - branches.append( - Branch( - name=branch.get('name', ''), - commit_sha=branch.get('target', {}).get('hash', ''), - protected=False, # Bitbucket doesn't expose this in the API - last_push_date=branch.get('target', {}).get('date', None), - ) - ) - - return branches - - async def get_paginated_branches( - self, repository: str, page: int = 1, per_page: int = 30 - ) -> PaginatedBranchesResponse: - """Get branches for a repository with pagination.""" - # Extract owner and repo from the repository string (e.g., "owner/repo") - parts = repository.split('/') - if len(parts) < 2: - raise ValueError(f'Invalid repository name: {repository}') - - owner = parts[-2] - repo = parts[-1] - - url = f'{self.BASE_URL}/repositories/{owner}/{repo}/refs/branches' - - params = { - 'pagelen': per_page, - 'page': page, - 'sort': '-target.date', # Sort by most recent commit date, descending - } - - response, _ = await self._make_request(url, params) - - branches = [] - for branch in response.get('values', []): - branches.append( - Branch( - name=branch.get('name', ''), - commit_sha=branch.get('target', {}).get('hash', ''), - protected=False, # Bitbucket doesn't expose this in the API - last_push_date=branch.get('target', {}).get('date', None), - ) - ) - - # Bitbucket provides pagination info in the response - has_next_page = response.get('next') is not None - total_count = response.get('size') # Total number of items - - return PaginatedBranchesResponse( - branches=branches, - has_next_page=has_next_page, - current_page=page, - per_page=per_page, - total_count=total_count, - ) - - async def search_branches( - self, repository: str, query: str, per_page: int = 30 - ) -> list[Branch]: - """Search branches by name using Bitbucket API with `q` param.""" - parts = repository.split('/') - if len(parts) < 2: - raise ValueError(f'Invalid repository name: {repository}') - - owner = parts[-2] - repo = parts[-1] - - url = f'{self.BASE_URL}/repositories/{owner}/{repo}/refs/branches' - # Bitbucket filtering: name ~ "query" - params = { - 'pagelen': per_page, - 'q': f'name~"{query}"', - 'sort': '-target.date', - } - response, _ = await self._make_request(url, params) - - branches: list[Branch] = [] - for branch in response.get('values', []): - branches.append( - Branch( - name=branch.get('name', ''), - commit_sha=branch.get('target', {}).get('hash', ''), - protected=False, - last_push_date=branch.get('target', {}).get('date', None), - ) - ) - return branches - - async def create_pr( - self, - repo_name: str, - source_branch: str, - target_branch: str, - title: str, - body: str | None = None, - draft: bool = False, - ) -> str: - """Creates a pull request in Bitbucket. - - Args: - repo_name: The repository name in the format "workspace/repo" - source_branch: The source branch name - target_branch: The target branch name - title: The title of the pull request - body: The description of the pull request - draft: Whether to create a draft pull request - - Returns: - The URL of the created pull request - """ - owner, repo = self._extract_owner_and_repo(repo_name) - - url = f'{self.BASE_URL}/repositories/{owner}/{repo}/pullrequests' - - payload = { - 'title': title, - 'description': body or '', - 'source': {'branch': {'name': source_branch}}, - 'destination': {'branch': {'name': target_branch}}, - 'close_source_branch': False, - 'draft': draft, - } - - data, _ = await self._make_request( - url=url, params=payload, method=RequestMethod.POST - ) - - # Return the URL to the pull request - return data.get('links', {}).get('html', {}).get('href', '') - - async def get_pr_details(self, repository: str, pr_number: int) -> dict: - """Get detailed information about a specific pull request - - Args: - repository: Repository name in format 'owner/repo' - pr_number: The pull request number - - Returns: - Raw Bitbucket API response for the pull request - """ - url = f'{self.BASE_URL}/repositories/{repository}/pullrequests/{pr_number}' - pr_data, _ = await self._make_request(url) - - return pr_data - - async def get_microagent_content( - self, repository: str, file_path: str - ) -> MicroagentContentResponse: - """Fetch individual file content from Bitbucket repository. - - Args: - repository: Repository name in format 'workspace/repo_slug' - file_path: Path to the file within the repository - - Returns: - MicroagentContentResponse with parsed content and triggers - - Raises: - RuntimeError: If file cannot be fetched or doesn't exist - """ - # Step 1: Get repository details using existing method - repo_details = await self.get_repository_details_from_repo_name(repository) - - if not repo_details.main_branch: - logger.warning( - f'No main branch found in repository info for {repository}. ' - f'Repository response: mainbranch field missing' - ) - raise ResourceNotFoundError( - f'Main branch not found for repository {repository}. ' - f'This repository may be empty or have no default branch configured.' - ) - - # Step 2: Get file content using the main branch - file_url = f'{self.BASE_URL}/repositories/{repository}/src/{repo_details.main_branch}/{file_path}' - response, _ = await self._make_request(file_url) - - # Parse the content to extract triggers from frontmatter - return self._parse_microagent_content(response, file_path) - - async def is_pr_open(self, repository: str, pr_number: int) -> bool: - """Check if a Bitbucket pull request is still active (not closed/merged). - - Args: - repository: Repository name in format 'owner/repo' - pr_number: The PR number to check - - Returns: - True if PR is active (OPEN), False if closed/merged - """ - try: - pr_details = await self.get_pr_details(repository, pr_number) - - # Bitbucket API response structure - # https://developer.atlassian.com/cloud/bitbucket/rest/api-group-pullrequests/#api-repositories-workspace-repo-slug-pullrequests-pull-request-id-get - if 'state' in pr_details: - # Bitbucket state values: OPEN, MERGED, DECLINED, SUPERSEDED - return pr_details['state'] == 'OPEN' - - # If we can't determine the state, assume it's active (safer default) - logger.warning( - f'Could not determine Bitbucket PR status for {repository}#{pr_number}. ' - f'Response keys: {list(pr_details.keys())}. Assuming PR is active.' - ) - return True - - except Exception as e: - logger.warning( - f'Could not determine Bitbucket PR status for {repository}#{pr_number}: {e}. ' - f'Including conversation to be safe.' - ) - # If we can't determine the PR status, include the conversation to be safe - return True - bitbucket_service_cls = os.environ.get( 'OPENHANDS_BITBUCKET_SERVICE_CLS', diff --git a/openhands/integrations/bitbucket/service/__init__.py b/openhands/integrations/bitbucket/service/__init__.py new file mode 100644 index 0000000000..836fb91da9 --- /dev/null +++ b/openhands/integrations/bitbucket/service/__init__.py @@ -0,0 +1,13 @@ +from .base import BitBucketMixinBase +from .branches import BitBucketBranchesMixin +from .features import BitBucketFeaturesMixin +from .prs import BitBucketPRsMixin +from .repos import BitBucketReposMixin + +__all__ = [ + 'BitBucketMixinBase', + 'BitBucketBranchesMixin', + 'BitBucketFeaturesMixin', + 'BitBucketPRsMixin', + 'BitBucketReposMixin', +] diff --git a/openhands/integrations/bitbucket/service/base.py b/openhands/integrations/bitbucket/service/base.py new file mode 100644 index 0000000000..be135c5ca5 --- /dev/null +++ b/openhands/integrations/bitbucket/service/base.py @@ -0,0 +1,248 @@ +import base64 +from typing import Any + +import httpx +from pydantic import SecretStr + +from openhands.integrations.service_types import ( + BaseGitService, + OwnerType, + ProviderType, + Repository, + RequestMethod, + ResourceNotFoundError, + User, +) + + +class BitBucketMixinBase(BaseGitService): + """ + Base mixin for BitBucket service containing common functionality + """ + + BASE_URL = 'https://api.bitbucket.org/2.0' + token: SecretStr = SecretStr('') + refresh = False + + def _extract_owner_and_repo(self, repository: str) -> tuple[str, str]: + """Extract owner and repo from repository string. + + Args: + repository: Repository name in format 'workspace/repo_slug' + + Returns: + Tuple of (owner, repo) + + Raises: + ValueError: If repository format is invalid + """ + parts = repository.split('/') + if len(parts) < 2: + raise ValueError(f'Invalid repository name: {repository}') + + return parts[-2], parts[-1] + + async def get_latest_token(self) -> SecretStr | None: + """Get latest working token of the user.""" + return self.token + + def _has_token_expired(self, status_code: int) -> bool: + return status_code == 401 + + async def _get_bitbucket_headers(self) -> dict[str, str]: + """Get headers for Bitbucket API requests.""" + token_value = self.token.get_secret_value() + + # Check if the token contains a colon, which indicates it's in username:password format + if ':' in token_value: + auth_str = base64.b64encode(token_value.encode()).decode() + return { + 'Authorization': f'Basic {auth_str}', + 'Accept': 'application/json', + } + else: + return { + 'Authorization': f'Bearer {token_value}', + 'Accept': 'application/json', + } + + async def _make_request( + self, + url: str, + params: dict | None = None, + method: RequestMethod = RequestMethod.GET, + ) -> tuple[Any, dict]: + """Make a request to the Bitbucket API. + + Args: + url: The URL to request + params: Optional parameters for the request + method: The HTTP method to use + + Returns: + A tuple of (response_data, response_headers) + + """ + try: + async with httpx.AsyncClient() as client: + bitbucket_headers = await self._get_bitbucket_headers() + response = await self.execute_request( + client, url, bitbucket_headers, params, method + ) + if self.refresh and self._has_token_expired(response.status_code): + await self.get_latest_token() + bitbucket_headers = await self._get_bitbucket_headers() + response = await self.execute_request( + client=client, + url=url, + headers=bitbucket_headers, + params=params, + method=method, + ) + response.raise_for_status() + return response.json(), dict(response.headers) + except httpx.HTTPStatusError as e: + raise self.handle_http_status_error(e) + except httpx.HTTPError as e: + raise self.handle_http_error(e) + + async def _fetch_paginated_data( + self, url: str, params: dict, max_items: int + ) -> list[dict]: + """Fetch data with pagination support for Bitbucket API. + + Args: + url: The API endpoint URL + params: Query parameters for the request + max_items: Maximum number of items to fetch + + Returns: + List of data items from all pages + """ + all_items: list[dict] = [] + current_url = url + + while current_url and len(all_items) < max_items: + response, _ = await self._make_request(current_url, params) + + # Extract items from response + page_items = response.get('values', []) + all_items.extend(page_items) + + # Get next page URL from response + current_url = response.get('next') + + # Clear params for subsequent requests as they're included in the next URL + params = {} + + return all_items[:max_items] + + async def get_user(self) -> User: + """Get the authenticated user's information.""" + url = f'{self.BASE_URL}/user' + data, _ = await self._make_request(url) + + account_id = data.get('account_id', '') + + return User( + id=account_id, + login=data.get('username', ''), + avatar_url=data.get('links', {}).get('avatar', {}).get('href', ''), + name=data.get('display_name'), + email=None, # Bitbucket API doesn't return email in this endpoint + ) + + def _parse_repository( + self, repo: dict, link_header: str | None = None + ) -> Repository: + """Parse a Bitbucket API repository response into a Repository object. + + Args: + repo: Repository data from Bitbucket API + link_header: Optional link header for pagination + + Returns: + Repository object + """ + repo_id = repo.get('uuid', '') + + workspace_slug = repo.get('workspace', {}).get('slug', '') + repo_slug = repo.get('slug', '') + full_name = ( + f'{workspace_slug}/{repo_slug}' if workspace_slug and repo_slug else '' + ) + + is_public = not repo.get('is_private', True) + owner_type = OwnerType.ORGANIZATION + main_branch = repo.get('mainbranch', {}).get('name') + + return Repository( + id=repo_id, + full_name=full_name, # type: ignore[arg-type] + git_provider=ProviderType.BITBUCKET, + is_public=is_public, + stargazers_count=None, # Bitbucket doesn't have stars + pushed_at=repo.get('updated_on'), + owner_type=owner_type, + link_header=link_header, + main_branch=main_branch, + ) + + async def get_repository_details_from_repo_name( + self, repository: str + ) -> Repository: + """Get repository details from repository name. + + Args: + repository: Repository name in format 'workspace/repo_slug' + + Returns: + Repository object with details + """ + url = f'{self.BASE_URL}/repositories/{repository}' + data, _ = await self._make_request(url) + return self._parse_repository(data) + + async def _get_cursorrules_url(self, repository: str) -> str: + """Get the URL for checking .cursorrules file.""" + # Get repository details to get the main branch + repo_details = await self.get_repository_details_from_repo_name(repository) + if not repo_details.main_branch: + raise ResourceNotFoundError( + f'Main branch not found for repository {repository}. ' + f'This repository may be empty or have no default branch configured.' + ) + return f'{self.BASE_URL}/repositories/{repository}/src/{repo_details.main_branch}/.cursorrules' + + async def _get_microagents_directory_url( + self, repository: str, microagents_path: str + ) -> str: + """Get the URL for checking microagents directory.""" + # Get repository details to get the main branch + repo_details = await self.get_repository_details_from_repo_name(repository) + if not repo_details.main_branch: + raise ResourceNotFoundError( + f'Main branch not found for repository {repository}. ' + f'This repository may be empty or have no default branch configured.' + ) + return f'{self.BASE_URL}/repositories/{repository}/src/{repo_details.main_branch}/{microagents_path}' + + def _get_microagents_directory_params(self, microagents_path: str) -> dict | None: + """Get parameters for the microagents directory request. Return None if no parameters needed.""" + return None + + def _is_valid_microagent_file(self, item: dict) -> bool: + """Check if an item represents a valid microagent file.""" + return ( + item['type'] == 'commit_file' + and item['path'].endswith('.md') + and not item['path'].endswith('README.md') + ) + + def _get_file_name_from_item(self, item: dict) -> str: + """Extract file name from directory item.""" + return item['path'].split('/')[-1] + + def _get_file_path_from_item(self, item: dict, microagents_path: str) -> str: + """Extract file path from directory item.""" + return item['path'] diff --git a/openhands/integrations/bitbucket/service/branches.py b/openhands/integrations/bitbucket/service/branches.py new file mode 100644 index 0000000000..4d04cf8ca9 --- /dev/null +++ b/openhands/integrations/bitbucket/service/branches.py @@ -0,0 +1,116 @@ +from openhands.integrations.bitbucket.service.base import BitBucketMixinBase +from openhands.integrations.service_types import Branch, PaginatedBranchesResponse + + +class BitBucketBranchesMixin(BitBucketMixinBase): + """ + Mixin for BitBucket branch-related operations + """ + + async def get_branches(self, repository: str) -> list[Branch]: + """Get branches for a repository.""" + owner, repo = self._extract_owner_and_repo(repository) + + url = f'{self.BASE_URL}/repositories/{owner}/{repo}/refs/branches' + + # Set maximum branches to fetch (similar to GitHub/GitLab implementations) + MAX_BRANCHES = 1000 + PER_PAGE = 100 + + params = { + 'pagelen': PER_PAGE, + 'sort': '-target.date', # Sort by most recent commit date, descending + } + + # Fetch all branches with pagination + branch_data = await self._fetch_paginated_data(url, params, MAX_BRANCHES) + + branches = [] + for branch in branch_data: + branches.append( + Branch( + name=branch.get('name', ''), + commit_sha=branch.get('target', {}).get('hash', ''), + protected=False, # Bitbucket doesn't expose this in the API + last_push_date=branch.get('target', {}).get('date', None), + ) + ) + + return branches + + async def get_paginated_branches( + self, repository: str, page: int = 1, per_page: int = 30 + ) -> PaginatedBranchesResponse: + """Get branches for a repository with pagination.""" + # Extract owner and repo from the repository string (e.g., "owner/repo") + parts = repository.split('/') + if len(parts) < 2: + raise ValueError(f'Invalid repository name: {repository}') + + owner = parts[-2] + repo = parts[-1] + + url = f'{self.BASE_URL}/repositories/{owner}/{repo}/refs/branches' + + params = { + 'pagelen': per_page, + 'page': page, + 'sort': '-target.date', # Sort by most recent commit date, descending + } + + response, _ = await self._make_request(url, params) + + branches = [] + for branch in response.get('values', []): + branches.append( + Branch( + name=branch.get('name', ''), + commit_sha=branch.get('target', {}).get('hash', ''), + protected=False, # Bitbucket doesn't expose this in the API + last_push_date=branch.get('target', {}).get('date', None), + ) + ) + + # Bitbucket provides pagination info in the response + has_next_page = response.get('next') is not None + total_count = response.get('size') # Total number of items + + return PaginatedBranchesResponse( + branches=branches, + has_next_page=has_next_page, + current_page=page, + per_page=per_page, + total_count=total_count, + ) + + async def search_branches( + self, repository: str, query: str, per_page: int = 30 + ) -> list[Branch]: + """Search branches by name using Bitbucket API with `q` param.""" + parts = repository.split('/') + if len(parts) < 2: + raise ValueError(f'Invalid repository name: {repository}') + + owner = parts[-2] + repo = parts[-1] + + url = f'{self.BASE_URL}/repositories/{owner}/{repo}/refs/branches' + # Bitbucket filtering: name ~ "query" + params = { + 'pagelen': per_page, + 'q': f'name~"{query}"', + 'sort': '-target.date', + } + response, _ = await self._make_request(url, params) + + branches: list[Branch] = [] + for branch in response.get('values', []): + branches.append( + Branch( + name=branch.get('name', ''), + commit_sha=branch.get('target', {}).get('hash', ''), + protected=False, + last_push_date=branch.get('target', {}).get('date', None), + ) + ) + return branches diff --git a/openhands/integrations/bitbucket/service/features.py b/openhands/integrations/bitbucket/service/features.py new file mode 100644 index 0000000000..8451ad9603 --- /dev/null +++ b/openhands/integrations/bitbucket/service/features.py @@ -0,0 +1,45 @@ +from openhands.core.logger import openhands_logger as logger +from openhands.integrations.bitbucket.service.base import BitBucketMixinBase +from openhands.integrations.service_types import ResourceNotFoundError +from openhands.microagent.types import MicroagentContentResponse + + +class BitBucketFeaturesMixin(BitBucketMixinBase): + """ + Mixin for BitBucket feature operations (microagents, cursor rules, etc.) + """ + + async def get_microagent_content( + self, repository: str, file_path: str + ) -> MicroagentContentResponse: + """Fetch individual file content from Bitbucket repository. + + Args: + repository: Repository name in format 'workspace/repo_slug' + file_path: Path to the file within the repository + + Returns: + MicroagentContentResponse with parsed content and triggers + + Raises: + RuntimeError: If file cannot be fetched or doesn't exist + """ + # Step 1: Get repository details using existing method + repo_details = await self.get_repository_details_from_repo_name(repository) + + if not repo_details.main_branch: + logger.warning( + f'No main branch found in repository info for {repository}. ' + f'Repository response: mainbranch field missing' + ) + raise ResourceNotFoundError( + f'Main branch not found for repository {repository}. ' + f'This repository may be empty or have no default branch configured.' + ) + + # Step 2: Get file content using the main branch + file_url = f'{self.BASE_URL}/repositories/{repository}/src/{repo_details.main_branch}/{file_path}' + response, _ = await self._make_request(file_url) + + # Parse the content to extract triggers from frontmatter + return self._parse_microagent_content(response, file_path) diff --git a/openhands/integrations/bitbucket/service/prs.py b/openhands/integrations/bitbucket/service/prs.py new file mode 100644 index 0000000000..89e9d22dde --- /dev/null +++ b/openhands/integrations/bitbucket/service/prs.py @@ -0,0 +1,100 @@ +from openhands.core.logger import openhands_logger as logger +from openhands.integrations.bitbucket.service.base import BitBucketMixinBase +from openhands.integrations.service_types import RequestMethod + + +class BitBucketPRsMixin(BitBucketMixinBase): + """ + Mixin for BitBucket pull request operations + """ + + async def create_pr( + self, + repo_name: str, + source_branch: str, + target_branch: str, + title: str, + body: str | None = None, + draft: bool = False, + ) -> str: + """Creates a pull request in Bitbucket. + + Args: + repo_name: The repository name in the format "workspace/repo" + source_branch: The source branch name + target_branch: The target branch name + title: The title of the pull request + body: The description of the pull request + draft: Whether to create a draft pull request + + Returns: + The URL of the created pull request + """ + owner, repo = self._extract_owner_and_repo(repo_name) + + url = f'{self.BASE_URL}/repositories/{owner}/{repo}/pullrequests' + + payload = { + 'title': title, + 'description': body or '', + 'source': {'branch': {'name': source_branch}}, + 'destination': {'branch': {'name': target_branch}}, + 'close_source_branch': False, + 'draft': draft, + } + + data, _ = await self._make_request( + url=url, params=payload, method=RequestMethod.POST + ) + + # Return the URL to the pull request + return data.get('links', {}).get('html', {}).get('href', '') + + async def get_pr_details(self, repository: str, pr_number: int) -> dict: + """Get detailed information about a specific pull request + + Args: + repository: Repository name in format 'owner/repo' + pr_number: The pull request number + + Returns: + Raw Bitbucket API response for the pull request + """ + url = f'{self.BASE_URL}/repositories/{repository}/pullrequests/{pr_number}' + pr_data, _ = await self._make_request(url) + + return pr_data + + async def is_pr_open(self, repository: str, pr_number: int) -> bool: + """Check if a Bitbucket pull request is still active (not closed/merged). + + Args: + repository: Repository name in format 'owner/repo' + pr_number: The PR number to check + + Returns: + True if PR is active (OPEN), False if closed/merged + """ + try: + pr_details = await self.get_pr_details(repository, pr_number) + + # Bitbucket API response structure + # https://developer.atlassian.com/cloud/bitbucket/rest/api-group-pullrequests/#api-repositories-workspace-repo-slug-pullrequests-pull-request-id-get + if 'state' in pr_details: + # Bitbucket state values: OPEN, MERGED, DECLINED, SUPERSEDED + return pr_details['state'] == 'OPEN' + + # If we can't determine the state, assume it's active (safer default) + logger.warning( + f'Could not determine Bitbucket PR status for {repository}#{pr_number}. ' + f'Response keys: {list(pr_details.keys())}. Assuming PR is active.' + ) + return True + + except Exception as e: + logger.warning( + f'Could not determine Bitbucket PR status for {repository}#{pr_number}: {e}. ' + f'Including conversation to be safe.' + ) + # If we can't determine the PR status, include the conversation to be safe + return True diff --git a/openhands/integrations/bitbucket/service/repos.py b/openhands/integrations/bitbucket/service/repos.py new file mode 100644 index 0000000000..1582c5f71d --- /dev/null +++ b/openhands/integrations/bitbucket/service/repos.py @@ -0,0 +1,245 @@ +import re +from typing import Any + +from openhands.integrations.bitbucket.service.base import BitBucketMixinBase +from openhands.integrations.service_types import Repository, SuggestedTask +from openhands.server.types import AppMode + + +class BitBucketReposMixin(BitBucketMixinBase): + """ + Mixin for BitBucket repository-related operations + """ + + async def search_repositories( + self, query: str, per_page: int, sort: str, order: str, public: bool + ) -> list[Repository]: + """Search for repositories.""" + repositories = [] + + if public: + # Extract workspace and repo from URL + # URL format: https://{domain}/{workspace}/{repo}/{additional_params} + # Split by '/' and find workspace and repo parts + url_parts = query.split('/') + if len(url_parts) >= 5: # https:, '', domain, workspace, repo + workspace_slug = url_parts[3] + repo_name = url_parts[4] + + repo = await self.get_repository_details_from_repo_name( + f'{workspace_slug}/{repo_name}' + ) + repositories.append(repo) + + return repositories + + # Search for repos once workspace prefix exists + if '/' in query: + workspace_slug, repo_query = query.split('/', 1) + return await self.get_paginated_repos( + 1, per_page, sort, workspace_slug, repo_query + ) + + all_installations = await self.get_installations() + + # Workspace prefix isn't complete. Search workspace names and repos underneath each workspace + matching_workspace_slugs = [ + installation for installation in all_installations if query in installation + ] + for workspace_slug in matching_workspace_slugs: + # Get repositories where query matches workspace name + try: + repos = await self.get_paginated_repos( + 1, per_page, sort, workspace_slug + ) + repositories.extend(repos) + except Exception: + continue + + for workspace_slug in all_installations: + # Get repositories in all workspaces where query matches repo name + try: + repos = await self.get_paginated_repos( + 1, per_page, sort, workspace_slug, query + ) + repositories.extend(repos) + except Exception: + continue + + return repositories + + async def _get_user_workspaces(self) -> list[dict[str, Any]]: + """Get all workspaces the user has access to""" + url = f'{self.BASE_URL}/workspaces' + data, _ = await self._make_request(url) + return data.get('values', []) + + async def get_installations( + self, query: str | None = None, limit: int = 100 + ) -> list[str]: + workspaces_url = f'{self.BASE_URL}/workspaces' + params = {} + if query: + params['q'] = f'name~"{query}"' + + workspaces = await self._fetch_paginated_data(workspaces_url, params, limit) + + installations: list[str] = [] + for workspace in workspaces: + installations.append(workspace['slug']) + + return installations + + async def get_paginated_repos( + self, + page: int, + per_page: int, + sort: str, + installation_id: str | None, + query: str | None = None, + ) -> list[Repository]: + """Get paginated repositories for a specific workspace. + + Args: + page: The page number to fetch + per_page: The number of repositories per page + sort: The sort field ('pushed', 'updated', 'created', 'full_name') + installation_id: The workspace slug to fetch repositories from (as int, will be converted to string) + + Returns: + A list of Repository objects + """ + if not installation_id: + return [] + + # Convert installation_id to string for use as workspace_slug + workspace_slug = installation_id + workspace_repos_url = f'{self.BASE_URL}/repositories/{workspace_slug}' + + # Map sort parameter to Bitbucket API compatible values + bitbucket_sort = sort + if sort == 'pushed': + # Bitbucket doesn't support 'pushed', use 'updated_on' instead + bitbucket_sort = '-updated_on' # Use negative prefix for descending order + elif sort == 'updated': + bitbucket_sort = '-updated_on' + elif sort == 'created': + bitbucket_sort = '-created_on' + elif sort == 'full_name': + bitbucket_sort = 'name' # Bitbucket uses 'name' not 'full_name' + else: + # Default to most recently updated first + bitbucket_sort = '-updated_on' + + params = { + 'pagelen': per_page, + 'page': page, + 'sort': bitbucket_sort, + } + + if query: + params['q'] = f'name~"{query}"' + + response, headers = await self._make_request(workspace_repos_url, params) + + # Extract repositories from the response + repos = response.get('values', []) + + # Extract next URL from response + next_link = response.get('next', '') + + # Format the link header in a way that the frontend can understand + # The frontend expects a format like: ; rel="next" + # where the URL contains a page parameter + formatted_link_header = '' + if next_link: + # Extract the page number from the next URL if possible + page_match = re.search(r'[?&]page=(\d+)', next_link) + if page_match: + next_page = page_match.group(1) + # Format it in a way that extractNextPageFromLink in frontend can parse + formatted_link_header = ( + f'<{workspace_repos_url}?page={next_page}>; rel="next"' + ) + else: + # If we can't extract the page, just use the next URL as is + formatted_link_header = f'<{next_link}>; rel="next"' + + repositories = [ + self._parse_repository(repo, link_header=formatted_link_header) + for repo in repos + ] + + return repositories + + async def get_all_repositories( + self, sort: str, app_mode: AppMode + ) -> list[Repository]: + """Get repositories for the authenticated user using workspaces endpoint. + + This method gets all repositories (both public and private) that the user has access to + by iterating through their workspaces and fetching repositories from each workspace. + This approach is more comprehensive and efficient than the previous implementation + that made separate calls for public and private repositories. + """ + MAX_REPOS = 1000 + PER_PAGE = 100 # Maximum allowed by Bitbucket API + repositories: list[Repository] = [] + + # Get user's workspaces with pagination + workspaces_url = f'{self.BASE_URL}/workspaces' + workspaces = await self._fetch_paginated_data(workspaces_url, {}, MAX_REPOS) + + for workspace in workspaces: + workspace_slug = workspace.get('slug') + if not workspace_slug: + continue + + # Get repositories for this workspace with pagination + workspace_repos_url = f'{self.BASE_URL}/repositories/{workspace_slug}' + + # Map sort parameter to Bitbucket API compatible values and ensure descending order + # to show most recently changed repos at the top + bitbucket_sort = sort + if sort == 'pushed': + # Bitbucket doesn't support 'pushed', use 'updated_on' instead + bitbucket_sort = ( + '-updated_on' # Use negative prefix for descending order + ) + elif sort == 'updated': + bitbucket_sort = '-updated_on' + elif sort == 'created': + bitbucket_sort = '-created_on' + elif sort == 'full_name': + bitbucket_sort = 'name' # Bitbucket uses 'name' not 'full_name' + else: + # Default to most recently updated first + bitbucket_sort = '-updated_on' + + params = { + 'pagelen': PER_PAGE, + 'sort': bitbucket_sort, + } + + # Fetch all repositories for this workspace with pagination + workspace_repos = await self._fetch_paginated_data( + workspace_repos_url, params, MAX_REPOS - len(repositories) + ) + + for repo in workspace_repos: + repositories.append(self._parse_repository(repo)) + + # Stop if we've reached the maximum number of repositories + if len(repositories) >= MAX_REPOS: + break + + # Stop if we've reached the maximum number of repositories + if len(repositories) >= MAX_REPOS: + break + + return repositories + + async def get_suggested_tasks(self) -> list[SuggestedTask]: + """Get suggested tasks for the authenticated user across all repositories.""" + # TODO: implemented suggested tasks + return []