import logging import os import tempfile from abc import ABC from dataclasses import dataclass from pathlib import Path from typing import TYPE_CHECKING, AsyncGenerator from uuid import UUID if TYPE_CHECKING: import httpx import base62 from openhands.app_server.app_conversation.app_conversation_models import ( AgentType, AppConversationStartTask, AppConversationStartTaskStatus, ) from openhands.app_server.app_conversation.app_conversation_service import ( AppConversationService, ) from openhands.app_server.app_conversation.skill_loader import ( build_org_config, build_sandbox_config, load_skills_from_agent_server, ) from openhands.app_server.sandbox.sandbox_models import SandboxInfo from openhands.app_server.user.user_context import UserContext from openhands.sdk import Agent from openhands.sdk.context.agent_context import AgentContext from openhands.sdk.context.condenser import LLMSummarizingCondenser from openhands.sdk.context.skills import Skill from openhands.sdk.llm import LLM from openhands.sdk.security.analyzer import SecurityAnalyzerBase from openhands.sdk.security.confirmation_policy import ( AlwaysConfirm, ConfirmationPolicyBase, ConfirmRisky, NeverConfirm, ) from openhands.sdk.security.llm_analyzer import LLMSecurityAnalyzer from openhands.sdk.workspace.remote.async_remote_workspace import AsyncRemoteWorkspace _logger = logging.getLogger(__name__) PRE_COMMIT_HOOK = '.git/hooks/pre-commit' PRE_COMMIT_LOCAL = '.git/hooks/pre-commit.local' def get_project_dir( working_dir: str, selected_repository: str | None = None, ) -> str: """Get the project root directory for a conversation. When a repository is selected, the project root is the cloned repo directory at {working_dir}/{repo_name}. This is the directory that contains the `.openhands/` configuration (setup.sh, pre-commit.sh, skills/, etc.). Without a repository, the project root is the working_dir itself. This must be used consistently for ALL features that depend on the project root: - workspace.working_dir (terminal CWD, file editor root, etc.) - .openhands/setup.sh execution - .openhands/pre-commit.sh (git hooks setup) - .openhands/skills/ (project skills) - PLAN.md path Args: working_dir: Base working directory path in the sandbox (e.g., '/workspace/project' from sandbox_spec) selected_repository: Repository name (e.g., 'OpenHands/software-agent-sdk') If provided, the repo name is appended to working_dir. Returns: The project root directory path. """ if selected_repository: repo_name = selected_repository.split('/')[-1] return f'{working_dir}/{repo_name}' return working_dir @dataclass class AppConversationServiceBase(AppConversationService, ABC): """App Conversation service which adds git specific functionality. Sets up repositories and installs hooks """ init_git_in_empty_workspace: bool user_context: UserContext async def load_and_merge_all_skills( self, sandbox: SandboxInfo, selected_repository: str | None, project_dir: str, agent_server_url: str, ) -> list[Skill]: """Load skills from all sources via the agent-server. This method calls the agent-server's /api/skills endpoint to load and merge skills from all sources. The agent-server handles: - Public skills (from OpenHands/skills GitHub repo) - User skills (from ~/.openhands/skills/) - Organization skills (from {org}/.openhands repo) - Project/repo skills (from repo .agents/skills/, .openhands/microagents/, and legacy .openhands/skills/) - Sandbox skills (from exposed URLs) Args: sandbox: SandboxInfo containing exposed URLs and agent-server URL selected_repository: Repository name or None project_dir: Project root directory (resolved via get_project_dir). agent_server_url: Agent-server URL (required) Returns: List of merged Skill objects from all sources, or empty list on failure """ try: _logger.debug('Loading skills for V1 conversation via agent-server') if not agent_server_url: _logger.warning('No agent-server URL available, cannot load skills') return [] # Build org config (authentication handled by app-server) org_config = await build_org_config(selected_repository, self.user_context) # Build sandbox config (exposed URLs) sandbox_config = build_sandbox_config(sandbox) # Single API call to agent-server for ALL skills all_skills = await load_skills_from_agent_server( agent_server_url=agent_server_url, session_api_key=sandbox.session_api_key, project_dir=project_dir, org_config=org_config, sandbox_config=sandbox_config, load_public=True, load_user=True, load_project=True, load_org=True, ) _logger.info( f'Loaded {len(all_skills)} total skills from agent-server: ' f'{[s.name for s in all_skills]}' ) return all_skills except Exception as e: _logger.warning(f'Failed to load skills: {e}', exc_info=True) # Return empty list on failure - skills will be loaded again later if needed return [] def _create_agent_with_skills(self, agent, skills: list[Skill]): """Create or update agent with skills in its context. Args: agent: The agent to update skills: List of Skill objects to add to agent context Returns: Updated agent with skills in context """ if agent.agent_context: # Merge with existing context (new skills override existing ones) existing_skills = agent.agent_context.skills all_skills = self._merge_skills([existing_skills, skills]) agent = agent.model_copy( update={ 'agent_context': agent.agent_context.model_copy( update={'skills': all_skills} ) } ) else: # Create new context agent_context = AgentContext(skills=skills) agent = agent.model_copy(update={'agent_context': agent_context}) return agent def _merge_skills(self, skill_lists: list[list[Skill]]) -> list[Skill]: """Merge multiple skill lists, avoiding duplicates by name. Later lists take precedence over earlier lists for duplicate names. Args: skill_lists: List of skill lists to merge Returns: Deduplicated list of skills with later lists overriding earlier ones """ skills_by_name: dict[str, Skill] = {} for skill_list in skill_lists: for skill in skill_list: skills_by_name[skill.name] = skill return list(skills_by_name.values()) async def _load_skills_and_update_agent( self, sandbox: SandboxInfo, agent: Agent, remote_workspace: AsyncRemoteWorkspace, selected_repository: str | None, project_dir: str, ): """Load all skills and update agent with them. Args: agent: The agent to update remote_workspace: AsyncRemoteWorkspace for loading repo skills selected_repository: Repository name or None (used for org config) project_dir: Project root directory (already resolved via get_project_dir). Returns: Updated agent with skills loaded into context """ agent_server_url = remote_workspace.host all_skills = await self.load_and_merge_all_skills( sandbox, selected_repository, project_dir, agent_server_url, ) # Update agent with skills agent = self._create_agent_with_skills(agent, all_skills) return agent async def run_setup_scripts( self, task: AppConversationStartTask, sandbox: SandboxInfo, workspace: AsyncRemoteWorkspace, agent_server_url: str, ) -> AsyncGenerator[AppConversationStartTask, None]: task.status = AppConversationStartTaskStatus.PREPARING_REPOSITORY yield task await self.clone_or_init_git_repo(task, workspace) # Compute the project root — the cloned repo directory when a repo is # selected, or the sandbox working_dir otherwise. This must be used # for all .openhands/ features (setup.sh, pre-commit.sh, skills). project_dir = get_project_dir( workspace.working_dir, task.request.selected_repository ) task.status = AppConversationStartTaskStatus.RUNNING_SETUP_SCRIPT yield task await self.maybe_run_setup_script(workspace, project_dir) task.status = AppConversationStartTaskStatus.SETTING_UP_GIT_HOOKS yield task await self.maybe_setup_git_hooks(workspace, project_dir) task.status = AppConversationStartTaskStatus.SETTING_UP_SKILLS yield task await self.load_and_merge_all_skills( sandbox, task.request.selected_repository, project_dir, agent_server_url, ) async def _configure_git_user_settings( self, workspace: AsyncRemoteWorkspace, ) -> None: """Configure git global user settings from user preferences. Reads git_user_name and git_user_email from user settings and configures them as git global settings in the workspace. Args: workspace: The remote workspace to configure git settings in. """ try: user_info = await self.user_context.get_user_info() if user_info.git_user_name: cmd = f'git config --global user.name "{user_info.git_user_name}"' result = await workspace.execute_command(cmd, workspace.working_dir) if result.exit_code: _logger.warning(f'Git config user.name failed: {result.stderr}') else: _logger.info( f'Git configured with user.name={user_info.git_user_name}' ) if user_info.git_user_email: cmd = f'git config --global user.email "{user_info.git_user_email}"' result = await workspace.execute_command(cmd, workspace.working_dir) if result.exit_code: _logger.warning(f'Git config user.email failed: {result.stderr}') else: _logger.info( f'Git configured with user.email={user_info.git_user_email}' ) except Exception as e: _logger.warning(f'Failed to configure git user settings: {e}') async def clone_or_init_git_repo( self, task: AppConversationStartTask, workspace: AsyncRemoteWorkspace, ): request = task.request # Create the projects directory if it does not exist yet parent = Path(workspace.working_dir).parent result = await workspace.execute_command( f'mkdir {workspace.working_dir}', parent ) if result.exit_code: _logger.warning(f'mkdir failed: {result.stderr}') # Configure git user settings from user preferences await self._configure_git_user_settings(workspace) if not request.selected_repository: if self.init_git_in_empty_workspace: _logger.debug('Initializing a new git repository in the workspace.') cmd = ( 'git init && git config --global ' f'--add safe.directory {workspace.working_dir}' ) result = await workspace.execute_command(cmd, workspace.working_dir) if result.exit_code: _logger.warning(f'Git init failed: {result.stderr}') else: _logger.info('Not initializing a new git repository.') return remote_repo_url: str = await self.user_context.get_authenticated_git_url( request.selected_repository ) if not remote_repo_url: raise ValueError('Missing either Git token or valid repository') dir_name = request.selected_repository.split('/')[-1] # Clone the repo - this is the slow part! clone_command = f'git clone {remote_repo_url} {dir_name}' result = await workspace.execute_command( clone_command, workspace.working_dir, 120 ) if result.exit_code: _logger.warning(f'Git clone failed: {result.stderr}') # Checkout the appropriate branch if request.selected_branch: checkout_command = f'git checkout {request.selected_branch}' else: # Generate a random branch name to avoid conflicts random_str = base62.encodebytes(os.urandom(16)) openhands_workspace_branch = f'openhands-workspace-{random_str}' checkout_command = f'git checkout -b {openhands_workspace_branch}' git_dir = Path(workspace.working_dir) / dir_name result = await workspace.execute_command(checkout_command, git_dir) if result.exit_code: _logger.warning(f'Git checkout failed: {result.stderr}') async def maybe_run_setup_script( self, workspace: AsyncRemoteWorkspace, project_dir: str, ): """Run .openhands/setup.sh if it exists in the project root. Args: workspace: Remote workspace for command execution. project_dir: Project root directory (repo root when a repo is selected). """ setup_script = project_dir + '/.openhands/setup.sh' await workspace.execute_command( f'chmod +x {setup_script} && source {setup_script}', cwd=project_dir, timeout=600, ) async def maybe_setup_git_hooks( self, workspace: AsyncRemoteWorkspace, project_dir: str, ): """Set up git hooks if .openhands/pre-commit.sh exists in the project root. Args: workspace: Remote workspace for command execution. project_dir: Project root directory (repo root when a repo is selected). """ command = 'mkdir -p .git/hooks && chmod +x .openhands/pre-commit.sh' result = await workspace.execute_command(command, project_dir) if result.exit_code: return # Check if there's an existing pre-commit hook with tempfile.TemporaryFile(mode='w+t') as temp_file: result = await workspace.file_download(PRE_COMMIT_HOOK, str(temp_file)) if result.get('success'): _logger.info('Preserving existing pre-commit hook') # an existing pre-commit hook exists if 'This hook was installed by OpenHands' not in temp_file.read(): # Move the existing hook to pre-commit.local command = ( f'mv {PRE_COMMIT_HOOK} {PRE_COMMIT_LOCAL} &&' f'chmod +x {PRE_COMMIT_LOCAL}' ) result = await workspace.execute_command(command, project_dir) if result.exit_code != 0: _logger.error( f'Failed to preserve existing pre-commit hook: {result.stderr}', ) return # write the pre-commit hook await workspace.file_upload( source_path=Path(__file__).parent / 'git' / 'pre-commit.sh', destination_path=PRE_COMMIT_HOOK, ) # Make the pre-commit hook executable result = await workspace.execute_command(f'chmod +x {PRE_COMMIT_HOOK}') if result.exit_code: _logger.error(f'Failed to make pre-commit hook executable: {result.stderr}') return _logger.info('Git pre-commit hook installed successfully') def _create_condenser( self, llm: LLM, agent_type: AgentType, condenser_max_size: int | None, ) -> LLMSummarizingCondenser: """Create a condenser based on user settings and agent type. Args: llm: The LLM instance to use for condensation agent_type: Type of agent (PLAN or DEFAULT) condenser_max_size: condenser_max_size setting Returns: Configured LLMSummarizingCondenser instance """ # LLMSummarizingCondenser SDK defaults: max_size=240, keep_first=2 condenser_kwargs = { 'llm': llm.model_copy( update={ 'usage_id': ( 'condenser' if agent_type == AgentType.DEFAULT else 'planning_condenser' ) } ), } # Only override max_size if user has a custom value if condenser_max_size is not None: condenser_kwargs['max_size'] = condenser_max_size condenser = LLMSummarizingCondenser(**condenser_kwargs) return condenser def _create_security_analyzer_from_string( self, security_analyzer_str: str | None ) -> SecurityAnalyzerBase | None: """Convert security analyzer string from settings to SecurityAnalyzerBase instance. Args: security_analyzer_str: String value from settings. Valid values: - "llm" -> LLMSecurityAnalyzer - "none" or None -> None - Other values -> None (unsupported analyzers are ignored) Returns: SecurityAnalyzerBase instance or None """ if not security_analyzer_str or security_analyzer_str.lower() == 'none': return None if security_analyzer_str.lower() == 'llm': return LLMSecurityAnalyzer() # For unknown values, log a warning and return None _logger.warning( f'Unknown security analyzer value: {security_analyzer_str}. ' 'Supported values: "llm", "none". Defaulting to None.' ) return None def _select_confirmation_policy( self, confirmation_mode: bool, security_analyzer: str | None ) -> ConfirmationPolicyBase: """Choose confirmation policy using only mode flag and analyzer string.""" if not confirmation_mode: return NeverConfirm() analyzer_kind = (security_analyzer or '').lower() if analyzer_kind == 'llm': return ConfirmRisky() return AlwaysConfirm() async def _set_security_analyzer_from_settings( self, agent_server_url: str, session_api_key: str | None, conversation_id: UUID, security_analyzer_str: str | None, httpx_client: 'httpx.AsyncClient', ) -> None: """Set security analyzer on conversation using only the analyzer string. Args: agent_server_url: URL of the agent server session_api_key: Session API key for authentication conversation_id: ID of the conversation to update security_analyzer_str: String value from settings httpx_client: HTTP client for making API requests """ if session_api_key is None: return security_analyzer = self._create_security_analyzer_from_string( security_analyzer_str ) # Only make API call if we have a security analyzer to set # (None is the default, so we can skip the call if it's None) if security_analyzer is None: return try: # Prepare the request payload payload = {'security_analyzer': security_analyzer.model_dump()} # Call agent server API to set security analyzer response = await httpx_client.post( f'{agent_server_url}/api/conversations/{conversation_id}/security_analyzer', json=payload, headers={'X-Session-API-Key': session_api_key}, timeout=30.0, ) response.raise_for_status() _logger.info( f'Successfully set security analyzer for conversation {conversation_id}' ) except Exception as e: # Log error but don't fail conversation creation _logger.warning( f'Failed to set security analyzer for conversation {conversation_id}: {e}', exc_info=True, )