Files
OpenHands/openhands/app_server/app_conversation/app_conversation_service_base.py
openhands 3a234b78a4 feat: add registered_marketplaces to Settings for multiple marketplace support
This PR introduces support for registering multiple marketplaces with explicit
auto-load semantics, providing an alternative to the single marketplace_path
approach in PR #13117.

## Changes

### New Models

**MarketplaceRegistration** - Registration for a plugin marketplace:
- name: Identifier for this marketplace registration
- source: Marketplace source (github:owner/repo, git URL, or local path)
- ref: Optional branch, tag, or commit
- repo_path: Subdirectory path for monorepos
- auto_load: 'all' to load plugins at conversation start, None for on-demand

### Updated Settings Model

Added registered_marketplaces field to Settings (list of MarketplaceRegistration).
This allows users to configure multiple marketplaces with different loading
behaviors.

### Updated Skill Loading

- skill_loader: Added registered_marketplaces parameter to pass marketplace
  registrations to the agent-server API
- app_conversation_service_base: Added registered_marketplaces parameter to
  load_and_merge_all_skills method

### Tests

- Added comprehensive tests for MarketplaceRegistration model
- Added tests for Settings.registered_marketplaces field
- Added tests for skill_loader marketplace handling

## Key Behaviors

- Marketplace resolution composes instance → org → user (additive)
- auto_load='all' loads all plugins at conversation start
- auto_load=None registers marketplace for on-demand resolution
- Path validation rejects absolute paths and directory traversal

## Dependencies

This PR is designed to work with SDK PR #2495 which provides:
- MarketplaceRegistry class for managing registered marketplaces
- Plugin resolution via 'plugin-name@marketplace-name' syntax
- Lazy fetching and caching of marketplace manifests

## Related

- Alternative to #13117 (marketplace_path setting)
- Leverages SDK PR OpenHands/software-agent-sdk#2495
- Enables #12916 (org-level default resources)
- Aligns with #13188 (instance-default org proposal)
- Supports #10947 (OpenHands configuration proposal)
2026-03-19 17:47:24 +00:00

570 lines
21 KiB
Python

import logging
import os
import tempfile
from abc import ABC
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, AsyncGenerator
from uuid import UUID
if TYPE_CHECKING:
import httpx
import base62
from openhands.app_server.app_conversation.app_conversation_models import (
AgentType,
AppConversationStartTask,
AppConversationStartTaskStatus,
)
from openhands.app_server.app_conversation.app_conversation_service import (
AppConversationService,
)
from openhands.app_server.app_conversation.skill_loader import (
build_org_config,
build_sandbox_config,
load_skills_from_agent_server,
)
from openhands.app_server.sandbox.sandbox_models import SandboxInfo
from openhands.app_server.user.user_context import UserContext
from openhands.sdk import Agent
from openhands.sdk.context.agent_context import AgentContext
from openhands.sdk.context.condenser import LLMSummarizingCondenser
from openhands.sdk.context.skills import Skill
from openhands.sdk.llm import LLM
from openhands.sdk.security.analyzer import SecurityAnalyzerBase
from openhands.sdk.security.confirmation_policy import (
AlwaysConfirm,
ConfirmationPolicyBase,
ConfirmRisky,
NeverConfirm,
)
from openhands.sdk.security.llm_analyzer import LLMSecurityAnalyzer
from openhands.sdk.workspace.remote.async_remote_workspace import AsyncRemoteWorkspace
_logger = logging.getLogger(__name__)
PRE_COMMIT_HOOK = '.git/hooks/pre-commit'
PRE_COMMIT_LOCAL = '.git/hooks/pre-commit.local'
def get_project_dir(
working_dir: str,
selected_repository: str | None = None,
) -> str:
"""Get the project root directory for a conversation.
When a repository is selected, the project root is the cloned repo directory
at {working_dir}/{repo_name}. This is the directory that contains the
`.openhands/` configuration (setup.sh, pre-commit.sh, skills/, etc.).
Without a repository, the project root is the working_dir itself.
This must be used consistently for ALL features that depend on the project root:
- workspace.working_dir (terminal CWD, file editor root, etc.)
- .openhands/setup.sh execution
- .openhands/pre-commit.sh (git hooks setup)
- .openhands/skills/ (project skills)
- PLAN.md path
Args:
working_dir: Base working directory path in the sandbox
(e.g., '/workspace/project' from sandbox_spec)
selected_repository: Repository name (e.g., 'OpenHands/software-agent-sdk')
If provided, the repo name is appended to working_dir.
Returns:
The project root directory path.
"""
if selected_repository:
repo_name = selected_repository.split('/')[-1]
return f'{working_dir}/{repo_name}'
return working_dir
@dataclass
class AppConversationServiceBase(AppConversationService, ABC):
"""App Conversation service which adds git specific functionality.
Sets up repositories and installs hooks
"""
init_git_in_empty_workspace: bool
user_context: UserContext
async def load_and_merge_all_skills(
self,
sandbox: SandboxInfo,
selected_repository: str | None,
project_dir: str,
agent_server_url: str,
registered_marketplaces: list | None = None,
) -> list[Skill]:
"""Load skills from all sources via the agent-server.
This method calls the agent-server's /api/skills endpoint to load and
merge skills from all sources. The agent-server handles:
- Public skills (from OpenHands/skills GitHub repo)
- User skills (from ~/.openhands/skills/)
- Organization skills (from {org}/.openhands repo)
- Project/repo skills (from repo .agents/skills/, .openhands/microagents/, and legacy .openhands/skills/)
- Sandbox skills (from exposed URLs)
- Registered marketplaces (from user settings)
Args:
sandbox: SandboxInfo containing exposed URLs and agent-server URL
selected_repository: Repository name or None
project_dir: Project root directory (resolved via get_project_dir).
agent_server_url: Agent-server URL (required)
registered_marketplaces: Optional list of MarketplaceRegistration objects
from user settings. Marketplaces with auto_load='all' will have
their plugins loaded automatically.
Returns:
List of merged Skill objects from all sources, or empty list on failure
"""
try:
_logger.debug('Loading skills for V1 conversation via agent-server')
if not agent_server_url:
_logger.warning('No agent-server URL available, cannot load skills')
return []
# Build org config (authentication handled by app-server)
org_config = await build_org_config(selected_repository, self.user_context)
# Build sandbox config (exposed URLs)
sandbox_config = build_sandbox_config(sandbox)
# Single API call to agent-server for ALL skills
all_skills = await load_skills_from_agent_server(
agent_server_url=agent_server_url,
session_api_key=sandbox.session_api_key,
project_dir=project_dir,
org_config=org_config,
sandbox_config=sandbox_config,
load_public=True,
load_user=True,
load_project=True,
load_org=True,
registered_marketplaces=registered_marketplaces,
)
_logger.info(
f'Loaded {len(all_skills)} total skills from agent-server: '
f'{[s.name for s in all_skills]}'
)
return all_skills
except Exception as e:
_logger.warning(f'Failed to load skills: {e}', exc_info=True)
# Return empty list on failure - skills will be loaded again later if needed
return []
def _create_agent_with_skills(self, agent, skills: list[Skill]):
"""Create or update agent with skills in its context.
Args:
agent: The agent to update
skills: List of Skill objects to add to agent context
Returns:
Updated agent with skills in context
"""
if agent.agent_context:
# Merge with existing context (new skills override existing ones)
existing_skills = agent.agent_context.skills
all_skills = self._merge_skills([existing_skills, skills])
agent = agent.model_copy(
update={
'agent_context': agent.agent_context.model_copy(
update={'skills': all_skills}
)
}
)
else:
# Create new context
agent_context = AgentContext(skills=skills)
agent = agent.model_copy(update={'agent_context': agent_context})
return agent
def _merge_skills(self, skill_lists: list[list[Skill]]) -> list[Skill]:
"""Merge multiple skill lists, avoiding duplicates by name.
Later lists take precedence over earlier lists for duplicate names.
Args:
skill_lists: List of skill lists to merge
Returns:
Deduplicated list of skills with later lists overriding earlier ones
"""
skills_by_name: dict[str, Skill] = {}
for skill_list in skill_lists:
for skill in skill_list:
skills_by_name[skill.name] = skill
return list(skills_by_name.values())
async def _load_skills_and_update_agent(
self,
sandbox: SandboxInfo,
agent: Agent,
remote_workspace: AsyncRemoteWorkspace,
selected_repository: str | None,
project_dir: str,
):
"""Load all skills and update agent with them.
Args:
agent: The agent to update
remote_workspace: AsyncRemoteWorkspace for loading repo skills
selected_repository: Repository name or None (used for org config)
project_dir: Project root directory (already resolved via get_project_dir).
Returns:
Updated agent with skills loaded into context
"""
agent_server_url = remote_workspace.host
all_skills = await self.load_and_merge_all_skills(
sandbox,
selected_repository,
project_dir,
agent_server_url,
)
# Update agent with skills
agent = self._create_agent_with_skills(agent, all_skills)
return agent
async def run_setup_scripts(
self,
task: AppConversationStartTask,
sandbox: SandboxInfo,
workspace: AsyncRemoteWorkspace,
agent_server_url: str,
) -> AsyncGenerator[AppConversationStartTask, None]:
task.status = AppConversationStartTaskStatus.PREPARING_REPOSITORY
yield task
await self.clone_or_init_git_repo(task, workspace)
# Compute the project root — the cloned repo directory when a repo is
# selected, or the sandbox working_dir otherwise. This must be used
# for all .openhands/ features (setup.sh, pre-commit.sh, skills).
project_dir = get_project_dir(
workspace.working_dir, task.request.selected_repository
)
task.status = AppConversationStartTaskStatus.RUNNING_SETUP_SCRIPT
yield task
await self.maybe_run_setup_script(workspace, project_dir)
task.status = AppConversationStartTaskStatus.SETTING_UP_GIT_HOOKS
yield task
await self.maybe_setup_git_hooks(workspace, project_dir)
task.status = AppConversationStartTaskStatus.SETTING_UP_SKILLS
yield task
await self.load_and_merge_all_skills(
sandbox,
task.request.selected_repository,
project_dir,
agent_server_url,
)
async def _configure_git_user_settings(
self,
workspace: AsyncRemoteWorkspace,
) -> None:
"""Configure git global user settings from user preferences.
Reads git_user_name and git_user_email from user settings and
configures them as git global settings in the workspace.
Args:
workspace: The remote workspace to configure git settings in.
"""
try:
user_info = await self.user_context.get_user_info()
if user_info.git_user_name:
cmd = f'git config --global user.name "{user_info.git_user_name}"'
result = await workspace.execute_command(cmd, workspace.working_dir)
if result.exit_code:
_logger.warning(f'Git config user.name failed: {result.stderr}')
else:
_logger.info(
f'Git configured with user.name={user_info.git_user_name}'
)
if user_info.git_user_email:
cmd = f'git config --global user.email "{user_info.git_user_email}"'
result = await workspace.execute_command(cmd, workspace.working_dir)
if result.exit_code:
_logger.warning(f'Git config user.email failed: {result.stderr}')
else:
_logger.info(
f'Git configured with user.email={user_info.git_user_email}'
)
except Exception as e:
_logger.warning(f'Failed to configure git user settings: {e}')
async def clone_or_init_git_repo(
self,
task: AppConversationStartTask,
workspace: AsyncRemoteWorkspace,
):
request = task.request
# Create the projects directory if it does not exist yet
parent = Path(workspace.working_dir).parent
result = await workspace.execute_command(
f'mkdir {workspace.working_dir}', parent
)
if result.exit_code:
_logger.warning(f'mkdir failed: {result.stderr}')
# Configure git user settings from user preferences
await self._configure_git_user_settings(workspace)
if not request.selected_repository:
if self.init_git_in_empty_workspace:
_logger.debug('Initializing a new git repository in the workspace.')
cmd = (
'git init && git config --global '
f'--add safe.directory {workspace.working_dir}'
)
result = await workspace.execute_command(cmd, workspace.working_dir)
if result.exit_code:
_logger.warning(f'Git init failed: {result.stderr}')
else:
_logger.info('Not initializing a new git repository.')
return
remote_repo_url: str = await self.user_context.get_authenticated_git_url(
request.selected_repository
)
if not remote_repo_url:
raise ValueError('Missing either Git token or valid repository')
dir_name = request.selected_repository.split('/')[-1]
# Clone the repo - this is the slow part!
clone_command = f'git clone {remote_repo_url} {dir_name}'
result = await workspace.execute_command(
clone_command, workspace.working_dir, 120
)
if result.exit_code:
_logger.warning(f'Git clone failed: {result.stderr}')
# Checkout the appropriate branch
if request.selected_branch:
checkout_command = f'git checkout {request.selected_branch}'
else:
# Generate a random branch name to avoid conflicts
random_str = base62.encodebytes(os.urandom(16))
openhands_workspace_branch = f'openhands-workspace-{random_str}'
checkout_command = f'git checkout -b {openhands_workspace_branch}'
git_dir = Path(workspace.working_dir) / dir_name
result = await workspace.execute_command(checkout_command, git_dir)
if result.exit_code:
_logger.warning(f'Git checkout failed: {result.stderr}')
async def maybe_run_setup_script(
self,
workspace: AsyncRemoteWorkspace,
project_dir: str,
):
"""Run .openhands/setup.sh if it exists in the project root.
Args:
workspace: Remote workspace for command execution.
project_dir: Project root directory (repo root when a repo is selected).
"""
setup_script = project_dir + '/.openhands/setup.sh'
await workspace.execute_command(
f'chmod +x {setup_script} && source {setup_script}',
cwd=project_dir,
timeout=600,
)
async def maybe_setup_git_hooks(
self,
workspace: AsyncRemoteWorkspace,
project_dir: str,
):
"""Set up git hooks if .openhands/pre-commit.sh exists in the project root.
Args:
workspace: Remote workspace for command execution.
project_dir: Project root directory (repo root when a repo is selected).
"""
command = 'mkdir -p .git/hooks && chmod +x .openhands/pre-commit.sh'
result = await workspace.execute_command(command, project_dir)
if result.exit_code:
return
# Check if there's an existing pre-commit hook
with tempfile.TemporaryFile(mode='w+t') as temp_file:
result = await workspace.file_download(PRE_COMMIT_HOOK, str(temp_file))
if result.success:
_logger.info('Preserving existing pre-commit hook')
# an existing pre-commit hook exists
if 'This hook was installed by OpenHands' not in temp_file.read():
# Move the existing hook to pre-commit.local
command = (
f'mv {PRE_COMMIT_HOOK} {PRE_COMMIT_LOCAL} &&'
f'chmod +x {PRE_COMMIT_LOCAL}'
)
result = await workspace.execute_command(command, project_dir)
if result.exit_code != 0:
_logger.error(
f'Failed to preserve existing pre-commit hook: {result.stderr}',
)
return
# write the pre-commit hook
await workspace.file_upload(
source_path=Path(__file__).parent / 'git' / 'pre-commit.sh',
destination_path=PRE_COMMIT_HOOK,
)
# Make the pre-commit hook executable
result = await workspace.execute_command(f'chmod +x {PRE_COMMIT_HOOK}')
if result.exit_code:
_logger.error(f'Failed to make pre-commit hook executable: {result.stderr}')
return
_logger.info('Git pre-commit hook installed successfully')
def _create_condenser(
self,
llm: LLM,
agent_type: AgentType,
condenser_max_size: int | None,
) -> LLMSummarizingCondenser:
"""Create a condenser based on user settings and agent type.
Args:
llm: The LLM instance to use for condensation
agent_type: Type of agent (PLAN or DEFAULT)
condenser_max_size: condenser_max_size setting
Returns:
Configured LLMSummarizingCondenser instance
"""
# LLMSummarizingCondenser SDK defaults: max_size=240, keep_first=2
condenser_kwargs = {
'llm': llm.model_copy(
update={
'usage_id': (
'condenser'
if agent_type == AgentType.DEFAULT
else 'planning_condenser'
)
}
),
}
# Only override max_size if user has a custom value
if condenser_max_size is not None:
condenser_kwargs['max_size'] = condenser_max_size
condenser = LLMSummarizingCondenser(**condenser_kwargs)
return condenser
def _create_security_analyzer_from_string(
self, security_analyzer_str: str | None
) -> SecurityAnalyzerBase | None:
"""Convert security analyzer string from settings to SecurityAnalyzerBase instance.
Args:
security_analyzer_str: String value from settings. Valid values:
- "llm" -> LLMSecurityAnalyzer
- "none" or None -> None
- Other values -> None (unsupported analyzers are ignored)
Returns:
SecurityAnalyzerBase instance or None
"""
if not security_analyzer_str or security_analyzer_str.lower() == 'none':
return None
if security_analyzer_str.lower() == 'llm':
return LLMSecurityAnalyzer()
# For unknown values, log a warning and return None
_logger.warning(
f'Unknown security analyzer value: {security_analyzer_str}. '
'Supported values: "llm", "none". Defaulting to None.'
)
return None
def _select_confirmation_policy(
self, confirmation_mode: bool, security_analyzer: str | None
) -> ConfirmationPolicyBase:
"""Choose confirmation policy using only mode flag and analyzer string."""
if not confirmation_mode:
return NeverConfirm()
analyzer_kind = (security_analyzer or '').lower()
if analyzer_kind == 'llm':
return ConfirmRisky()
return AlwaysConfirm()
async def _set_security_analyzer_from_settings(
self,
agent_server_url: str,
session_api_key: str | None,
conversation_id: UUID,
security_analyzer_str: str | None,
httpx_client: 'httpx.AsyncClient',
) -> None:
"""Set security analyzer on conversation using only the analyzer string.
Args:
agent_server_url: URL of the agent server
session_api_key: Session API key for authentication
conversation_id: ID of the conversation to update
security_analyzer_str: String value from settings
httpx_client: HTTP client for making API requests
"""
if session_api_key is None:
return
security_analyzer = self._create_security_analyzer_from_string(
security_analyzer_str
)
# Only make API call if we have a security analyzer to set
# (None is the default, so we can skip the call if it's None)
if security_analyzer is None:
return
try:
# Prepare the request payload
payload = {'security_analyzer': security_analyzer.model_dump()}
# Call agent server API to set security analyzer
response = await httpx_client.post(
f'{agent_server_url}/api/conversations/{conversation_id}/security_analyzer',
json=payload,
headers={'X-Session-API-Key': session_api_key},
timeout=30.0,
)
response.raise_for_status()
_logger.info(
f'Successfully set security analyzer for conversation {conversation_id}'
)
except Exception as e:
# Log error but don't fail conversation creation
_logger.warning(
f'Failed to set security analyzer for conversation {conversation_id}: {e}',
exc_info=True,
)