Merge origin/main and fix marketplace_path CI drift

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
openhands
2026-03-19 02:42:17 +00:00
428 changed files with 40375 additions and 5694 deletions

View File

@@ -1,8 +1,14 @@
# OpenHands Architecture
This directory contains the core components of OpenHands.
For an overview of the system architecture, see the [architecture documentation](https://docs.openhands.dev/usage/architecture/backend) (v0 backend architecture).
## Documentation
**[Architecture Documentation](./architecture/README.md)** with diagrams covering:
- System Architecture Overview
- Conversation Startup & WebSocket Flow
- Agent Execution & LLM Flow
- **[External Architecture Docs](https://docs.openhands.dev/usage/architecture/backend)** - Official documentation (v0 backend architecture)
## Classes

View File

@@ -24,6 +24,7 @@ class AppConversationInfoService(ABC):
created_at__lt: datetime | None = None,
updated_at__gte: datetime | None = None,
updated_at__lt: datetime | None = None,
sandbox_id__eq: str | None = None,
sort_order: AppConversationSortOrder = AppConversationSortOrder.CREATED_AT_DESC,
page_id: str | None = None,
limit: int = 100,
@@ -39,6 +40,7 @@ class AppConversationInfoService(ABC):
created_at__lt: datetime | None = None,
updated_at__gte: datetime | None = None,
updated_at__lt: datetime | None = None,
sandbox_id__eq: str | None = None,
) -> int:
"""Count sandboxed conversations."""

View File

@@ -16,6 +16,10 @@ from openhands.sdk.conversation.state import ConversationExecutionStatus
from openhands.sdk.llm import MetricsSnapshot
from openhands.sdk.plugin import PluginSource
from openhands.storage.data_models.conversation_metadata import ConversationTrigger
from openhands.storage.data_models.settings import SandboxGroupingStrategy
# Re-export SandboxGroupingStrategy for backward compatibility
__all__ = ['SandboxGroupingStrategy']
class AgentType(Enum):
@@ -238,3 +242,32 @@ class SkillResponse(BaseModel):
type: Literal['repo', 'knowledge', 'agentskills']
content: str
triggers: list[str] = []
class HookDefinitionResponse(BaseModel):
"""Response model for a single hook definition."""
type: str # 'command' or 'prompt'
command: str
timeout: int = 60
async_: bool = Field(default=False, serialization_alias='async')
class HookMatcherResponse(BaseModel):
"""Response model for a hook matcher."""
matcher: str # Pattern: '*', exact match, or regex
hooks: list[HookDefinitionResponse] = []
class HookEventResponse(BaseModel):
"""Response model for hooks of a specific event type."""
event_type: str # e.g., 'stop', 'pre_tool_use', 'post_tool_use'
matchers: list[HookMatcherResponse] = []
class GetHooksResponse(BaseModel):
"""Response model for hooks endpoint."""
hooks: list[HookEventResponse] = []

View File

@@ -5,43 +5,29 @@ import logging
import os
import sys
import tempfile
from dataclasses import dataclass
from datetime import datetime
from typing import Annotated, AsyncGenerator, Literal
from uuid import UUID
import httpx
from openhands.app_server.services.db_session_injector import set_db_session_keep_open
from openhands.app_server.services.httpx_client_injector import (
set_httpx_client_keep_open,
)
from openhands.app_server.services.injector import InjectorState
from openhands.app_server.user.specifiy_user_context import USER_CONTEXT_ATTR
from openhands.app_server.user.user_context import UserContext
from openhands.server.dependencies import get_dependencies
# Handle anext compatibility for Python < 3.10
if sys.version_info >= (3, 10):
from builtins import anext
else:
async def anext(async_iterator):
"""Compatibility function for anext in Python < 3.10"""
return await async_iterator.__anext__()
from fastapi import APIRouter, HTTPException, Query, Request, Response, status
from fastapi.responses import JSONResponse, StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from openhands.app_server.app_conversation.app_conversation_models import (
AppConversation,
AppConversationInfo,
AppConversationPage,
AppConversationStartRequest,
AppConversationStartTask,
AppConversationStartTaskPage,
AppConversationStartTaskSortOrder,
AppConversationUpdateRequest,
GetHooksResponse,
HookDefinitionResponse,
HookEventResponse,
HookMatcherResponse,
SkillResponse,
)
from openhands.app_server.app_conversation.app_conversation_service import (
@@ -49,6 +35,7 @@ from openhands.app_server.app_conversation.app_conversation_service import (
)
from openhands.app_server.app_conversation.app_conversation_service_base import (
AppConversationServiceBase,
get_project_dir,
)
from openhands.app_server.app_conversation.app_conversation_start_task_service import (
AppConversationStartTaskService,
@@ -65,15 +52,35 @@ from openhands.app_server.config import (
)
from openhands.app_server.sandbox.sandbox_models import (
AGENT_SERVER,
SandboxInfo,
SandboxStatus,
)
from openhands.app_server.sandbox.sandbox_service import SandboxService
from openhands.app_server.sandbox.sandbox_spec_models import SandboxSpecInfo
from openhands.app_server.sandbox.sandbox_spec_service import SandboxSpecService
from openhands.app_server.services.db_session_injector import set_db_session_keep_open
from openhands.app_server.services.httpx_client_injector import (
set_httpx_client_keep_open,
)
from openhands.app_server.services.injector import InjectorState
from openhands.app_server.user.specifiy_user_context import USER_CONTEXT_ATTR
from openhands.app_server.user.user_context import UserContext
from openhands.app_server.utils.docker_utils import (
replace_localhost_hostname_for_docker,
)
from openhands.sdk.context.skills import KeywordTrigger, TaskTrigger
from openhands.sdk.workspace.remote.async_remote_workspace import AsyncRemoteWorkspace
from openhands.server.dependencies import get_dependencies
# Handle anext compatibility for Python < 3.10
if sys.version_info >= (3, 10):
from builtins import anext
else:
async def anext(async_iterator):
"""Compatibility function for anext in Python < 3.10"""
return await async_iterator.__anext__()
# We use the get_dependencies method here to signal to the OpenAPI docs that this endpoint
# is protected. The actual protection is provided by SetAuthCookieMiddleware
@@ -91,6 +98,104 @@ httpx_client_dependency = depends_httpx_client()
sandbox_service_dependency = depends_sandbox_service()
sandbox_spec_service_dependency = depends_sandbox_spec_service()
@dataclass
class AgentServerContext:
"""Context for accessing the agent server for a conversation."""
conversation: AppConversationInfo
sandbox: SandboxInfo
sandbox_spec: SandboxSpecInfo
agent_server_url: str
session_api_key: str | None
async def _get_agent_server_context(
conversation_id: UUID,
app_conversation_service: AppConversationService,
sandbox_service: SandboxService,
sandbox_spec_service: SandboxSpecService,
) -> AgentServerContext | JSONResponse | None:
"""Get the agent server context for a conversation.
This helper retrieves all necessary information to communicate with the
agent server for a given conversation, including the sandbox info,
sandbox spec, and agent server URL.
Args:
conversation_id: The conversation ID
app_conversation_service: Service for conversation operations
sandbox_service: Service for sandbox operations
sandbox_spec_service: Service for sandbox spec operations
Returns:
AgentServerContext if successful, JSONResponse(404) if conversation
not found, or None if sandbox is not running (e.g. closed conversation).
"""
# Get the conversation info
conversation = await app_conversation_service.get_app_conversation(conversation_id)
if not conversation:
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={'error': f'Conversation {conversation_id} not found'},
)
# Get the sandbox info
sandbox = await sandbox_service.get_sandbox(conversation.sandbox_id)
if not sandbox:
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={'error': f'Sandbox not found for conversation {conversation_id}'},
)
# Return None for paused sandboxes (closed conversation)
if sandbox.status == SandboxStatus.PAUSED:
return None
# Return 404 for other non-running states (STARTING, ERROR, MISSING)
if sandbox.status != SandboxStatus.RUNNING:
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={'error': f'Sandbox not ready for conversation {conversation_id}'},
)
# Get the sandbox spec to find the working directory
sandbox_spec = await sandbox_spec_service.get_sandbox_spec(sandbox.sandbox_spec_id)
if not sandbox_spec:
# TODO: This is a temporary work around for the fact that we don't store previous
# sandbox spec versions when updating OpenHands. When the SandboxSpecServices
# transition to truly multi sandbox spec model this should raise a 404 error
logger.warning('Sandbox spec not found - using default.')
sandbox_spec = await sandbox_spec_service.get_default_sandbox_spec()
# Get the agent server URL
if not sandbox.exposed_urls:
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={'error': 'No agent server URL found for sandbox'},
)
agent_server_url = None
for exposed_url in sandbox.exposed_urls:
if exposed_url.name == AGENT_SERVER:
agent_server_url = exposed_url.url
break
if not agent_server_url:
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={'error': 'Agent server URL not found in sandbox'},
)
agent_server_url = replace_localhost_hostname_for_docker(agent_server_url)
return AgentServerContext(
conversation=conversation,
sandbox=sandbox,
sandbox_spec=sandbox_spec,
agent_server_url=agent_server_url,
session_api_key=sandbox.session_api_key,
)
# Read methods
@@ -116,6 +221,10 @@ async def search_app_conversations(
datetime | None,
Query(title='Filter by updated_at less than this datetime'),
] = None,
sandbox_id__eq: Annotated[
str | None,
Query(title='Filter by exact sandbox_id'),
] = None,
page_id: Annotated[
str | None,
Query(title='Optional next_page_id from the previously returned page'),
@@ -147,6 +256,7 @@ async def search_app_conversations(
created_at__lt=created_at__lt,
updated_at__gte=updated_at__gte,
updated_at__lt=updated_at__lt,
sandbox_id__eq=sandbox_id__eq,
page_id=page_id,
limit=limit,
include_sub_conversations=include_sub_conversations,
@@ -175,6 +285,10 @@ async def count_app_conversations(
datetime | None,
Query(title='Filter by updated_at less than this datetime'),
] = None,
sandbox_id__eq: Annotated[
str | None,
Query(title='Filter by exact sandbox_id'),
] = None,
app_conversation_service: AppConversationService = (
app_conversation_service_dependency
),
@@ -186,6 +300,7 @@ async def count_app_conversations(
created_at__lt=created_at__lt,
updated_at__gte=updated_at__gte,
updated_at__lt=updated_at__lt,
sandbox_id__eq=sandbox_id__eq,
)
@@ -476,62 +591,24 @@ async def get_conversation_skills(
- Global skills (OpenHands/skills/)
- User skills (~/.openhands/skills/)
- Organization skills (org/.openhands repository)
- Repository skills (repo/.openhands/skills/ or .openhands/microagents/)
- Repository skills (repo .agents/skills/, .openhands/microagents/, and legacy .openhands/skills/)
Returns:
JSONResponse: A JSON response containing the list of skills.
Returns an empty list if the sandbox is not running.
"""
try:
# Get the conversation info
conversation = await app_conversation_service.get_app_conversation(
conversation_id
# Get agent server context (conversation, sandbox, sandbox_spec, agent_server_url)
ctx = await _get_agent_server_context(
conversation_id,
app_conversation_service,
sandbox_service,
sandbox_spec_service,
)
if not conversation:
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={'error': f'Conversation {conversation_id} not found'},
)
# Get the sandbox info
sandbox = await sandbox_service.get_sandbox(conversation.sandbox_id)
if not sandbox or sandbox.status != SandboxStatus.RUNNING:
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={
'error': f'Sandbox not found or not running for conversation {conversation_id}'
},
)
# Get the sandbox spec to find the working directory
sandbox_spec = await sandbox_spec_service.get_sandbox_spec(
sandbox.sandbox_spec_id
)
if not sandbox_spec:
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={'error': 'Sandbox spec not found'},
)
# Get the agent server URL
if not sandbox.exposed_urls:
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={'error': 'No agent server URL found for sandbox'},
)
agent_server_url = None
for exposed_url in sandbox.exposed_urls:
if exposed_url.name == AGENT_SERVER:
agent_server_url = exposed_url.url
break
if not agent_server_url:
return JSONResponse(
status_code=status.HTTP_404_NOT_FOUND,
content={'error': 'Agent server URL not found in sandbox'},
)
agent_server_url = replace_localhost_hostname_for_docker(agent_server_url)
if isinstance(ctx, JSONResponse):
return ctx
if ctx is None:
return JSONResponse(status_code=status.HTTP_200_OK, content={'skills': []})
# Load skills from all sources
logger.info(f'Loading skills for conversation {conversation_id}')
@@ -539,11 +616,14 @@ async def get_conversation_skills(
# Prefer the shared loader to avoid duplication; otherwise return empty list.
all_skills: list = []
if isinstance(app_conversation_service, AppConversationServiceBase):
project_dir = get_project_dir(
ctx.sandbox_spec.working_dir, ctx.conversation.selected_repository
)
all_skills = await app_conversation_service.load_and_merge_all_skills(
sandbox,
conversation.selected_repository,
sandbox_spec.working_dir,
agent_server_url,
ctx.sandbox,
ctx.conversation.selected_repository,
project_dir,
ctx.agent_server_url,
)
logger.info(
@@ -593,6 +673,150 @@ async def get_conversation_skills(
)
@router.get('/{conversation_id}/hooks')
async def get_conversation_hooks(
conversation_id: UUID,
app_conversation_service: AppConversationService = (
app_conversation_service_dependency
),
sandbox_service: SandboxService = sandbox_service_dependency,
sandbox_spec_service: SandboxSpecService = sandbox_spec_service_dependency,
httpx_client: httpx.AsyncClient = httpx_client_dependency,
) -> JSONResponse:
"""Get hooks currently configured in the workspace for this conversation.
This endpoint loads hooks from the conversation's project directory in the
workspace (i.e. `{project_dir}/.openhands/hooks.json`) at request time.
Note:
This is intentionally a "live" view of the workspace configuration.
If `.openhands/hooks.json` changes over time, this endpoint reflects the
latest file content and may not match the hooks that were used when the
conversation originally started.
Returns:
JSONResponse: A JSON response containing the list of hook event types.
Returns an empty list if the sandbox is not running.
"""
try:
# Get agent server context (conversation, sandbox, sandbox_spec, agent_server_url)
ctx = await _get_agent_server_context(
conversation_id,
app_conversation_service,
sandbox_service,
sandbox_spec_service,
)
if isinstance(ctx, JSONResponse):
return ctx
if ctx is None:
return JSONResponse(status_code=status.HTTP_200_OK, content={'hooks': []})
from openhands.app_server.app_conversation.hook_loader import (
fetch_hooks_from_agent_server,
get_project_dir_for_hooks,
)
project_dir = get_project_dir_for_hooks(
ctx.sandbox_spec.working_dir,
ctx.conversation.selected_repository,
)
# Load hooks from agent-server (using the error-raising variant so
# HTTP/connection failures are surfaced to the user, not hidden).
logger.debug(
f'Loading hooks for conversation {conversation_id}, '
f'agent_server_url={ctx.agent_server_url}, '
f'project_dir={project_dir}'
)
try:
hook_config = await fetch_hooks_from_agent_server(
agent_server_url=ctx.agent_server_url,
session_api_key=ctx.session_api_key,
project_dir=project_dir,
httpx_client=httpx_client,
)
except httpx.HTTPStatusError as e:
logger.warning(
f'Agent-server returned {e.response.status_code} when loading hooks '
f'for conversation {conversation_id}: {e.response.text}'
)
return JSONResponse(
status_code=status.HTTP_502_BAD_GATEWAY,
content={
'error': f'Agent-server returned status {e.response.status_code} when loading hooks'
},
)
except httpx.RequestError as e:
logger.warning(
f'Failed to reach agent-server when loading hooks '
f'for conversation {conversation_id}: {e}'
)
return JSONResponse(
status_code=status.HTTP_502_BAD_GATEWAY,
content={'error': 'Failed to reach agent-server when loading hooks'},
)
# Transform hook_config to response format
hooks_response: list[HookEventResponse] = []
if hook_config:
# Define the event types to check
event_types = [
'pre_tool_use',
'post_tool_use',
'user_prompt_submit',
'session_start',
'session_end',
'stop',
]
for field_name in event_types:
matchers = getattr(hook_config, field_name, [])
if matchers:
matcher_responses = []
for matcher in matchers:
hook_defs = [
HookDefinitionResponse(
type=hook.type.value
if hasattr(hook.type, 'value')
else str(hook.type),
command=hook.command,
timeout=hook.timeout,
async_=hook.async_,
)
for hook in matcher.hooks
]
matcher_responses.append(
HookMatcherResponse(
matcher=matcher.matcher,
hooks=hook_defs,
)
)
hooks_response.append(
HookEventResponse(
event_type=field_name,
matchers=matcher_responses,
)
)
logger.debug(
f'Loaded {len(hooks_response)} hook event types for conversation {conversation_id}'
)
return JSONResponse(
status_code=status.HTTP_200_OK,
content=GetHooksResponse(hooks=hooks_response).model_dump(by_alias=True),
)
except Exception as e:
logger.error(f'Error getting hooks for conversation {conversation_id}: {e}')
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={'error': f'Error getting hooks: {str(e)}'},
)
@router.get('/{conversation_id}/download')
async def export_conversation(
conversation_id: UUID,

View File

@@ -29,6 +29,7 @@ class AppConversationService(ABC):
created_at__lt: datetime | None = None,
updated_at__gte: datetime | None = None,
updated_at__lt: datetime | None = None,
sandbox_id__eq: str | None = None,
sort_order: AppConversationSortOrder = AppConversationSortOrder.CREATED_AT_DESC,
page_id: str | None = None,
limit: int = 100,
@@ -44,6 +45,7 @@ class AppConversationService(ABC):
created_at__lt: datetime | None = None,
updated_at__gte: datetime | None = None,
updated_at__lt: datetime | None = None,
sandbox_id__eq: str | None = None,
) -> int:
"""Count sandboxed conversations."""

View File

@@ -47,6 +47,40 @@ PRE_COMMIT_HOOK = '.git/hooks/pre-commit'
PRE_COMMIT_LOCAL = '.git/hooks/pre-commit.local'
def get_project_dir(
working_dir: str,
selected_repository: str | None = None,
) -> str:
"""Get the project root directory for a conversation.
When a repository is selected, the project root is the cloned repo directory
at {working_dir}/{repo_name}. This is the directory that contains the
`.openhands/` configuration (setup.sh, pre-commit.sh, skills/, etc.).
Without a repository, the project root is the working_dir itself.
This must be used consistently for ALL features that depend on the project root:
- workspace.working_dir (terminal CWD, file editor root, etc.)
- .openhands/setup.sh execution
- .openhands/pre-commit.sh (git hooks setup)
- .openhands/skills/ (project skills)
- PLAN.md path
Args:
working_dir: Base working directory path in the sandbox
(e.g., '/workspace/project' from sandbox_spec)
selected_repository: Repository name (e.g., 'OpenHands/software-agent-sdk')
If provided, the repo name is appended to working_dir.
Returns:
The project root directory path.
"""
if selected_repository:
repo_name = selected_repository.split('/')[-1]
return f'{working_dir}/{repo_name}'
return working_dir
@dataclass
class AppConversationServiceBase(AppConversationService, ABC):
"""App Conversation service which adds git specific functionality.
@@ -61,7 +95,7 @@ class AppConversationServiceBase(AppConversationService, ABC):
self,
sandbox: SandboxInfo,
selected_repository: str | None,
working_dir: str,
project_dir: str,
agent_server_url: str,
) -> list[Skill]:
"""Load skills from all sources via the agent-server.
@@ -71,13 +105,13 @@ class AppConversationServiceBase(AppConversationService, ABC):
- Public skills (from OpenHands/skills GitHub repo)
- User skills (from ~/.openhands/skills/)
- Organization skills (from {org}/.openhands repo)
- Project/repo skills (from workspace .openhands/skills/)
- Project/repo skills (from repo .agents/skills/, .openhands/microagents/, and legacy .openhands/skills/)
- Sandbox skills (from exposed URLs)
Args:
sandbox: SandboxInfo containing exposed URLs and agent-server URL
selected_repository: Repository name or None
working_dir: Working directory path
project_dir: Project root directory (resolved via get_project_dir).
agent_server_url: Agent-server URL (required)
Returns:
@@ -105,12 +139,6 @@ class AppConversationServiceBase(AppConversationService, ABC):
f'Failed to load marketplace_path from user settings: {e}'
)
# Determine project directory for project skills
project_dir = working_dir
if selected_repository:
repo_name = selected_repository.split('/')[-1]
project_dir = f'{working_dir}/{repo_name}'
# Single API call to agent-server for ALL skills
all_skills = await load_skills_from_agent_server(
agent_server_url=agent_server_url,
@@ -190,7 +218,7 @@ class AppConversationServiceBase(AppConversationService, ABC):
agent: Agent,
remote_workspace: AsyncRemoteWorkspace,
selected_repository: str | None,
working_dir: str,
project_dir: str,
):
"""Load all skills and update agent with them.
@@ -198,17 +226,18 @@ class AppConversationServiceBase(AppConversationService, ABC):
sandbox: Sandbox information, including the agent-server session key
agent: The agent to update
remote_workspace: AsyncRemoteWorkspace for loading repo skills
selected_repository: Repository name or None
working_dir: Working directory path
selected_repository: Repository name or None (used for org config)
project_dir: Project root directory (already resolved via get_project_dir).
Returns:
Updated agent with skills loaded into context
"""
# Load and merge all skills
# Extract agent_server_url from remote_workspace host
agent_server_url = remote_workspace.host
all_skills = await self.load_and_merge_all_skills(
sandbox, selected_repository, working_dir, agent_server_url
sandbox,
selected_repository,
project_dir,
agent_server_url,
)
# Update agent with skills
@@ -227,20 +256,27 @@ class AppConversationServiceBase(AppConversationService, ABC):
yield task
await self.clone_or_init_git_repo(task, workspace)
# Compute the project root — the cloned repo directory when a repo is
# selected, or the sandbox working_dir otherwise. This must be used
# for all .openhands/ features (setup.sh, pre-commit.sh, skills).
project_dir = get_project_dir(
workspace.working_dir, task.request.selected_repository
)
task.status = AppConversationStartTaskStatus.RUNNING_SETUP_SCRIPT
yield task
await self.maybe_run_setup_script(workspace)
await self.maybe_run_setup_script(workspace, project_dir)
task.status = AppConversationStartTaskStatus.SETTING_UP_GIT_HOOKS
yield task
await self.maybe_setup_git_hooks(workspace)
await self.maybe_setup_git_hooks(workspace, project_dir)
task.status = AppConversationStartTaskStatus.SETTING_UP_SKILLS
yield task
await self.load_and_merge_all_skills(
sandbox,
task.request.selected_repository,
workspace.working_dir,
project_dir,
agent_server_url,
)
@@ -345,33 +381,42 @@ class AppConversationServiceBase(AppConversationService, ABC):
async def maybe_run_setup_script(
self,
workspace: AsyncRemoteWorkspace,
project_dir: str,
):
"""Run .openhands/setup.sh if it exists in the workspace or repository."""
setup_script = workspace.working_dir + '/.openhands/setup.sh'
"""Run .openhands/setup.sh if it exists in the project root.
Args:
workspace: Remote workspace for command execution.
project_dir: Project root directory (repo root when a repo is selected).
"""
setup_script = project_dir + '/.openhands/setup.sh'
await workspace.execute_command(
f'chmod +x {setup_script} && source {setup_script}', timeout=600
f'chmod +x {setup_script} && source {setup_script}',
cwd=project_dir,
timeout=600,
)
# TODO: Does this need to be done?
# Add the action to the event stream as an ENVIRONMENT event
# source = EventSource.ENVIRONMENT
# self.event_stream.add_event(action, source)
async def maybe_setup_git_hooks(
self,
workspace: AsyncRemoteWorkspace,
project_dir: str,
):
"""Set up git hooks if .openhands/pre-commit.sh exists in the workspace or repository."""
"""Set up git hooks if .openhands/pre-commit.sh exists in the project root.
Args:
workspace: Remote workspace for command execution.
project_dir: Project root directory (repo root when a repo is selected).
"""
command = 'mkdir -p .git/hooks && chmod +x .openhands/pre-commit.sh'
result = await workspace.execute_command(command, workspace.working_dir)
result = await workspace.execute_command(command, project_dir)
if result.exit_code:
return
# Check if there's an existing pre-commit hook
with tempfile.TemporaryFile(mode='w+t') as temp_file:
result = workspace.file_download(PRE_COMMIT_HOOK, str(temp_file))
if result.get('success'):
result = await workspace.file_download(PRE_COMMIT_HOOK, str(temp_file))
if result.success:
_logger.info('Preserving existing pre-commit hook')
# an existing pre-commit hook exists
if 'This hook was installed by OpenHands' not in temp_file.read():
@@ -380,9 +425,7 @@ class AppConversationServiceBase(AppConversationService, ABC):
f'mv {PRE_COMMIT_HOOK} {PRE_COMMIT_LOCAL} &&'
f'chmod +x {PRE_COMMIT_LOCAL}'
)
result = await workspace.execute_command(
command, workspace.working_dir
)
result = await workspace.execute_command(command, project_dir)
if result.exit_code != 0:
_logger.error(
f'Failed to preserve existing pre-commit hook: {result.stderr}',

View File

@@ -0,0 +1,148 @@
"""Utilities for loading hooks for V1 conversations.
This module provides functions to load hooks from the agent-server,
which centralizes all hook loading logic. The app-server acts as a
thin proxy that calls the agent-server's /api/hooks endpoint.
All hook loading is handled by the agent-server.
"""
import logging
import httpx
from openhands.sdk.hooks import HookConfig
_logger = logging.getLogger(__name__)
def get_project_dir_for_hooks(
working_dir: str,
selected_repository: str | None = None,
) -> str:
"""Get the project directory path for loading hooks.
When a repository is selected, hooks are loaded from
{working_dir}/{repo_name}/.openhands/hooks.json.
Otherwise, hooks are loaded from {working_dir}/.openhands/hooks.json.
Args:
working_dir: Base working directory path in the sandbox
selected_repository: Repository name (e.g., 'OpenHands/software-agent-sdk')
If provided, the repo name is appended to working_dir.
Returns:
The project directory path where hooks.json should be located.
"""
if selected_repository:
repo_name = selected_repository.split('/')[-1]
return f'{working_dir}/{repo_name}'
return working_dir
async def fetch_hooks_from_agent_server(
agent_server_url: str,
session_api_key: str | None,
project_dir: str,
httpx_client: httpx.AsyncClient,
) -> HookConfig | None:
"""Fetch hooks from the agent-server, raising on HTTP/connection errors.
This is the low-level function that makes a single API call to the
agent-server's /api/hooks endpoint. It raises on HTTP and connection
errors so callers can decide how to handle failures.
Args:
agent_server_url: URL of the agent server (e.g., 'http://localhost:8000')
session_api_key: Session API key for authentication (optional)
project_dir: Workspace directory path for project hooks
httpx_client: Shared HTTP client for making the request
Returns:
HookConfig if hooks.json exists and is valid, None if no hooks found.
Raises:
httpx.HTTPStatusError: If the agent-server returns a non-2xx status.
httpx.RequestError: If the agent-server is unreachable.
"""
_logger.debug(
f'fetch_hooks_from_agent_server called: '
f'agent_server_url={agent_server_url}, project_dir={project_dir}'
)
payload = {'project_dir': project_dir}
headers = {'Content-Type': 'application/json'}
if session_api_key:
headers['X-Session-API-Key'] = session_api_key
response = await httpx_client.post(
f'{agent_server_url}/api/hooks',
json=payload,
headers=headers,
timeout=30.0,
)
response.raise_for_status()
data = response.json()
hook_config_data = data.get('hook_config')
if hook_config_data is None:
_logger.debug('No hooks found in workspace')
return None
hook_config = HookConfig.from_dict(hook_config_data)
if hook_config.is_empty():
_logger.debug('Hooks config is empty')
return None
_logger.debug(f'Loaded hooks from agent-server for {project_dir}')
return hook_config
async def load_hooks_from_agent_server(
agent_server_url: str,
session_api_key: str | None,
project_dir: str,
httpx_client: httpx.AsyncClient,
) -> HookConfig | None:
"""Load hooks from the agent-server, swallowing errors gracefully.
Wrapper around fetch_hooks_from_agent_server that catches all errors
and returns None. Use this for the conversation-start path where hooks
are optional and failures should not block startup.
For the hooks viewer endpoint, use fetch_hooks_from_agent_server directly
so errors can be surfaced to the user.
Args:
agent_server_url: URL of the agent server (e.g., 'http://localhost:8000')
session_api_key: Session API key for authentication (optional)
project_dir: Workspace directory path for project hooks
httpx_client: Shared HTTP client for making the request
Returns:
HookConfig if hooks.json exists and is valid, None otherwise.
"""
try:
return await fetch_hooks_from_agent_server(
agent_server_url, session_api_key, project_dir, httpx_client
)
except httpx.HTTPStatusError as e:
_logger.warning(
f'Agent-server at {agent_server_url} returned error status {e.response.status_code} '
f'when loading hooks from {project_dir}: {e.response.text}'
)
return None
except httpx.RequestError as e:
_logger.warning(
f'Failed to connect to agent-server at {agent_server_url} '
f'when loading hooks from {project_dir}: {e}'
)
return None
except Exception as e:
_logger.warning(
f'Failed to load hooks from agent-server at {agent_server_url} '
f'for project {project_dir}: {e}'
)
return None

View File

@@ -7,7 +7,7 @@ import zipfile
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, AsyncGenerator, Sequence
from typing import Any, AsyncGenerator, Sequence, cast
from uuid import UUID, uuid4
import httpx
@@ -41,10 +41,14 @@ from openhands.app_server.app_conversation.app_conversation_service import (
)
from openhands.app_server.app_conversation.app_conversation_service_base import (
AppConversationServiceBase,
get_project_dir,
)
from openhands.app_server.app_conversation.app_conversation_start_task_service import (
AppConversationStartTaskService,
)
from openhands.app_server.app_conversation.hook_loader import (
load_hooks_from_agent_server,
)
from openhands.app_server.app_conversation.sql_app_conversation_info_service import (
SQLAppConversationInfoService,
)
@@ -58,6 +62,9 @@ from openhands.app_server.event_callback.event_callback_service import (
from openhands.app_server.event_callback.set_title_callback_processor import (
SetTitleCallbackProcessor,
)
from openhands.app_server.pending_messages.pending_message_service import (
PendingMessageService,
)
from openhands.app_server.sandbox.docker_sandbox_service import DockerSandboxService
from openhands.app_server.sandbox.sandbox_models import (
AGENT_SERVER,
@@ -77,9 +84,10 @@ from openhands.app_server.utils.llm_metadata import (
get_llm_metadata,
should_set_litellm_extra_body,
)
from openhands.integrations.provider import ProviderType
from openhands.integrations.provider import PROVIDER_TOKEN_TYPE, ProviderType
from openhands.integrations.service_types import SuggestedTask
from openhands.sdk import Agent, AgentContext, LocalWorkspace
from openhands.sdk.hooks import HookConfig
from openhands.sdk.llm import LLM
from openhands.sdk.plugin import PluginSource
from openhands.sdk.secret import LookupSecret, SecretValue, StaticSecret
@@ -87,6 +95,7 @@ from openhands.sdk.utils.paging import page_iterator
from openhands.sdk.workspace.remote.async_remote_workspace import AsyncRemoteWorkspace
from openhands.server.types import AppMode
from openhands.storage.data_models.conversation_metadata import ConversationTrigger
from openhands.storage.data_models.settings import SandboxGroupingStrategy
from openhands.tools.preset.default import (
get_default_tools,
)
@@ -125,8 +134,10 @@ class LiveStatusAppConversationService(AppConversationServiceBase):
sandbox_service: SandboxService
sandbox_spec_service: SandboxSpecService
jwt_service: JwtService
pending_message_service: PendingMessageService
sandbox_startup_timeout: int
sandbox_startup_poll_frequency: int
max_num_conversations_per_sandbox: int
httpx_client: httpx.AsyncClient
web_url: str | None
openhands_provider_base_url: str | None
@@ -134,6 +145,11 @@ class LiveStatusAppConversationService(AppConversationServiceBase):
app_mode: str | None = None
tavily_api_key: str | None = None
async def _get_sandbox_grouping_strategy(self) -> SandboxGroupingStrategy:
"""Get the sandbox grouping strategy from user settings."""
user_info = await self.user_context.get_user_info()
return user_info.sandbox_grouping_strategy
async def search_app_conversations(
self,
title__contains: str | None = None,
@@ -141,6 +157,7 @@ class LiveStatusAppConversationService(AppConversationServiceBase):
created_at__lt: datetime | None = None,
updated_at__gte: datetime | None = None,
updated_at__lt: datetime | None = None,
sandbox_id__eq: str | None = None,
sort_order: AppConversationSortOrder = AppConversationSortOrder.CREATED_AT_DESC,
page_id: str | None = None,
limit: int = 20,
@@ -153,6 +170,7 @@ class LiveStatusAppConversationService(AppConversationServiceBase):
created_at__lt=created_at__lt,
updated_at__gte=updated_at__gte,
updated_at__lt=updated_at__lt,
sandbox_id__eq=sandbox_id__eq,
sort_order=sort_order,
page_id=page_id,
limit=limit,
@@ -170,6 +188,7 @@ class LiveStatusAppConversationService(AppConversationServiceBase):
created_at__lt: datetime | None = None,
updated_at__gte: datetime | None = None,
updated_at__lt: datetime | None = None,
sandbox_id__eq: str | None = None,
) -> int:
return await self.app_conversation_info_service.count_app_conversation_info(
title__contains=title__contains,
@@ -177,6 +196,7 @@ class LiveStatusAppConversationService(AppConversationServiceBase):
created_at__lt=created_at__lt,
updated_at__gte=updated_at__gte,
updated_at__lt=updated_at__lt,
sandbox_id__eq=sandbox_id__eq,
)
async def get_app_conversation(
@@ -250,11 +270,20 @@ class LiveStatusAppConversationService(AppConversationServiceBase):
)
assert sandbox_spec is not None
# Set up conversation id
conversation_id = request.conversation_id or uuid4()
# Setup working dir based on grouping
working_dir = sandbox_spec.working_dir
sandbox_grouping_strategy = await self._get_sandbox_grouping_strategy()
if sandbox_grouping_strategy != SandboxGroupingStrategy.NO_GROUPING:
working_dir = f'{working_dir}/{conversation_id.hex}'
# Run setup scripts
remote_workspace = AsyncRemoteWorkspace(
host=agent_server_url,
api_key=sandbox.session_api_key,
working_dir=sandbox_spec.working_dir,
working_dir=working_dir,
)
async for updated_task in self.run_setup_scripts(
task, sandbox, remote_workspace, agent_server_url
@@ -265,13 +294,13 @@ class LiveStatusAppConversationService(AppConversationServiceBase):
start_conversation_request = (
await self._build_start_conversation_request_for_user(
sandbox,
conversation_id,
request.initial_message,
request.system_message_suffix,
request.git_provider,
sandbox_spec.working_dir,
working_dir,
request.agent_type,
request.llm_model,
request.conversation_id,
remote_workspace=remote_workspace,
selected_repository=request.selected_repository,
plugins=request.plugins,
@@ -287,6 +316,12 @@ class LiveStatusAppConversationService(AppConversationServiceBase):
body_json = start_conversation_request.model_dump(
mode='json', context={'expose_secrets': True}
)
# Log hook_config to verify it's being passed
hook_config_in_request = body_json.get('hook_config')
_logger.debug(
f'Sending StartConversationRequest with hook_config: '
f'{hook_config_in_request}'
)
response = await self.httpx_client.post(
f'{agent_server_url}/api/conversations',
json=body_json,
@@ -352,6 +387,15 @@ class LiveStatusAppConversationService(AppConversationServiceBase):
task.app_conversation_id = info.id
yield task
# Process any pending messages queued while waiting for conversation
if sandbox.session_api_key:
await self._process_pending_messages(
task_id=task.id,
conversation_id=info.id,
agent_server_url=agent_server_url,
session_api_key=sandbox.session_api_key,
)
except Exception as exc:
_logger.exception('Error starting conversation', stack_info=True)
task.status = AppConversationStartTaskStatus.ERROR
@@ -490,21 +534,157 @@ class LiveStatusAppConversationService(AppConversationServiceBase):
result[stored_conversation.sandbox_id].append(stored_conversation.id)
return result
async def _find_running_sandbox_for_user(self) -> SandboxInfo | None:
"""Find a running sandbox for the current user based on the grouping strategy.
Returns:
SandboxInfo if a running sandbox is found, None otherwise.
"""
try:
user_id = await self.user_context.get_user_id()
sandbox_grouping_strategy = await self._get_sandbox_grouping_strategy()
# If no grouping, return None to force creation of a new sandbox
if sandbox_grouping_strategy == SandboxGroupingStrategy.NO_GROUPING:
return None
# Collect all running sandboxes for this user
running_sandboxes = []
page_id = None
while True:
page = await self.sandbox_service.search_sandboxes(
page_id=page_id, limit=100
)
for sandbox in page.items:
if (
sandbox.status == SandboxStatus.RUNNING
and sandbox.created_by_user_id == user_id
):
running_sandboxes.append(sandbox)
if page.next_page_id is None:
break
page_id = page.next_page_id
if not running_sandboxes:
return None
# Apply the grouping strategy
return await self._select_sandbox_by_strategy(
running_sandboxes, sandbox_grouping_strategy
)
except Exception as e:
_logger.warning(
f'Error finding running sandbox for user: {e}', exc_info=True
)
return None
async def _select_sandbox_by_strategy(
self,
running_sandboxes: list[SandboxInfo],
sandbox_grouping_strategy: SandboxGroupingStrategy,
) -> SandboxInfo | None:
"""Select a sandbox from the list based on the configured grouping strategy.
Args:
running_sandboxes: List of running sandboxes for the user
sandbox_grouping_strategy: The strategy to use for selection
Returns:
Selected sandbox based on the strategy, or None if no sandbox is available
(e.g., all sandboxes have reached max_num_conversations_per_sandbox)
"""
# Get conversation counts for filtering by max_num_conversations_per_sandbox
sandbox_conversation_counts = await self._get_conversation_counts_by_sandbox(
[s.id for s in running_sandboxes]
)
# Filter out sandboxes that have reached the max number of conversations
available_sandboxes = [
s
for s in running_sandboxes
if sandbox_conversation_counts.get(s.id, 0)
< self.max_num_conversations_per_sandbox
]
if not available_sandboxes:
# All sandboxes have reached the max - need to create a new one
return None
if sandbox_grouping_strategy == SandboxGroupingStrategy.ADD_TO_ANY:
# Return the first available sandbox
return available_sandboxes[0]
elif sandbox_grouping_strategy == SandboxGroupingStrategy.GROUP_BY_NEWEST:
# Return the most recently created sandbox
return max(available_sandboxes, key=lambda s: s.created_at)
elif sandbox_grouping_strategy == SandboxGroupingStrategy.LEAST_RECENTLY_USED:
# Return the least recently created sandbox (oldest)
return min(available_sandboxes, key=lambda s: s.created_at)
elif sandbox_grouping_strategy == SandboxGroupingStrategy.FEWEST_CONVERSATIONS:
# Return the one with fewest conversations
return min(
available_sandboxes,
key=lambda s: sandbox_conversation_counts.get(s.id, 0),
)
else:
# Default fallback - return first sandbox
return available_sandboxes[0]
async def _get_conversation_counts_by_sandbox(
self, sandbox_ids: list[str]
) -> dict[str, int]:
"""Get the count of conversations for each sandbox.
Args:
sandbox_ids: List of sandbox IDs to count conversations for
Returns:
Dictionary mapping sandbox_id to conversation count
"""
try:
# Query count for each sandbox individually
# This is efficient since there are at most ~8 running sandboxes per user
counts: dict[str, int] = {}
for sandbox_id in sandbox_ids:
count = await self.app_conversation_info_service.count_app_conversation_info(
sandbox_id__eq=sandbox_id
)
counts[sandbox_id] = count
return counts
except Exception as e:
_logger.warning(
f'Error counting conversations by sandbox: {e}', exc_info=True
)
# Return empty counts on error - will default to first sandbox
return {}
async def _wait_for_sandbox_start(
self, task: AppConversationStartTask
) -> AsyncGenerator[AppConversationStartTask, None]:
"""Wait for sandbox to start and return info."""
# Get or create the sandbox
if not task.request.sandbox_id:
# Convert conversation_id to hex string if present
sandbox_id_str = (
task.request.conversation_id.hex
if task.request.conversation_id is not None
else None
)
sandbox = await self.sandbox_service.start_sandbox(
sandbox_id=sandbox_id_str
)
# First try to find a running sandbox for the current user
sandbox = await self._find_running_sandbox_for_user()
if sandbox is None:
# No running sandbox found, start a new one
# Convert conversation_id to hex string if present
sandbox_id_str = (
task.request.conversation_id.hex
if task.request.conversation_id is not None
else None
)
sandbox = await self.sandbox_service.start_sandbox(
sandbox_id=sandbox_id_str
)
task.sandbox_id = sandbox.id
else:
sandbox_info = await self.sandbox_service.get_sandbox(
@@ -657,7 +837,10 @@ class LiveStatusAppConversationService(AppConversationServiceBase):
secrets = await self.user_context.get_secrets()
# Get all provider tokens from user authentication
provider_tokens = await self.user_context.get_provider_tokens()
provider_tokens = cast(
PROVIDER_TOKEN_TYPE | None,
await self.user_context.get_provider_tokens(),
)
if not provider_tokens:
return secrets
@@ -1125,10 +1308,50 @@ class LiveStatusAppConversationService(AppConversationServiceBase):
run=initial_message.run,
)
async def _load_hooks_from_workspace(
self,
remote_workspace: AsyncRemoteWorkspace,
project_dir: str,
) -> HookConfig | None:
"""Load hooks from .openhands/hooks.json in the remote workspace.
This enables project-level hooks to be automatically loaded when starting
a conversation, similar to how OpenHands-CLI loads hooks from the workspace.
Uses the agent-server's /api/hooks endpoint, consistent with how skills
are loaded via /api/skills.
Args:
remote_workspace: AsyncRemoteWorkspace for accessing the agent server
project_dir: Project root directory path in the sandbox. This should
already be the resolved project directory (e.g.,
{working_dir}/{repo_name} when a repo is selected).
Returns:
HookConfig if hooks.json exists and is valid, None otherwise.
Returns None in the following cases:
- hooks.json file does not exist
- hooks.json contains invalid JSON
- hooks.json contains an empty hooks configuration
- Agent server is unreachable or returns an error
Note:
This method implements graceful degradation - if hooks cannot be loaded
for any reason, it returns None rather than raising an exception. This
ensures that conversation startup is not blocked by hook loading failures.
Errors are logged as warnings for debugging purposes.
"""
return await load_hooks_from_agent_server(
agent_server_url=remote_workspace.host,
session_api_key=remote_workspace._headers.get('X-Session-API-Key'),
project_dir=project_dir,
httpx_client=self.httpx_client,
)
async def _finalize_conversation_request(
self,
agent: Agent,
conversation_id: UUID | None,
conversation_id: UUID,
user: UserInfo,
workspace: LocalWorkspace,
initial_message: SendMessageRequest | None,
@@ -1164,6 +1387,7 @@ class LiveStatusAppConversationService(AppConversationServiceBase):
agent = self._update_agent_with_llm_metadata(agent, conversation_id, user.id)
# Load and merge skills if remote workspace is available
hook_config: HookConfig | None = None
if remote_workspace:
try:
agent = await self._load_skills_and_update_agent(
@@ -1173,6 +1397,28 @@ class LiveStatusAppConversationService(AppConversationServiceBase):
_logger.warning(f'Failed to load skills: {e}', exc_info=True)
# Continue without skills - don't fail conversation startup
# Load hooks from workspace (.openhands/hooks.json)
# Note: working_dir is already the resolved project_dir
# (includes repo name when a repo is selected), so we pass
# it directly without appending the repo name again.
try:
_logger.debug(
f'Attempting to load hooks from workspace: '
f'project_dir={working_dir}'
)
hook_config = await self._load_hooks_from_workspace(
remote_workspace, working_dir
)
if hook_config:
_logger.debug(
f'Successfully loaded hooks: {hook_config.model_dump()}'
)
else:
_logger.debug('No hooks found in workspace')
except Exception as e:
_logger.warning(f'Failed to load hooks: {e}', exc_info=True)
# Continue without hooks - don't fail conversation startup
# Incorporate plugin parameters into initial message if specified
final_initial_message = self._construct_initial_message_with_plugin_params(
initial_message, plugins
@@ -1201,18 +1447,19 @@ class LiveStatusAppConversationService(AppConversationServiceBase):
initial_message=final_initial_message,
secrets=secrets,
plugins=sdk_plugins,
hook_config=hook_config,
)
async def _build_start_conversation_request_for_user(
self,
sandbox: SandboxInfo,
conversation_id: UUID,
initial_message: SendMessageRequest | None,
system_message_suffix: str | None,
git_provider: ProviderType | None,
working_dir: str,
agent_type: AgentType = AgentType.DEFAULT,
llm_model: str | None = None,
conversation_id: UUID | None = None,
remote_workspace: AsyncRemoteWorkspace | None = None,
selected_repository: str | None = None,
plugins: list[PluginSpec] | None = None,
@@ -1227,7 +1474,12 @@ class LiveStatusAppConversationService(AppConversationServiceBase):
5. Passing plugins to the agent server for remote plugin loading
"""
user = await self.user_context.get_user_info()
workspace = LocalWorkspace(working_dir=working_dir)
# Compute the project root — this is the repo directory when a repo is
# selected, or the sandbox working_dir otherwise. All tools, hooks,
# setup scripts, and plan paths must use this consistently.
project_dir = get_project_dir(working_dir, selected_repository)
workspace = LocalWorkspace(working_dir=project_dir)
# Set up secrets for all git providers
secrets = await self._setup_secrets_for_git_providers(user)
@@ -1244,7 +1496,7 @@ class LiveStatusAppConversationService(AppConversationServiceBase):
user.condenser_max_size,
secrets=secrets,
git_provider=git_provider,
working_dir=working_dir,
working_dir=project_dir,
)
# Finalize and return the conversation request
@@ -1258,10 +1510,93 @@ class LiveStatusAppConversationService(AppConversationServiceBase):
sandbox,
remote_workspace,
selected_repository,
working_dir,
project_dir,
plugins=plugins,
)
async def _process_pending_messages(
self,
task_id: UUID,
conversation_id: UUID,
agent_server_url: str,
session_api_key: str,
) -> None:
"""Process pending messages queued before conversation was ready.
Messages are delivered concurrently to the agent server. After processing,
all messages are deleted from the database regardless of success or failure.
Args:
task_id: The start task ID (may have been used as conversation_id initially)
conversation_id: The real conversation ID
agent_server_url: URL of the agent server
session_api_key: API key for authenticating with agent server
"""
# Convert UUIDs to strings for the pending message service
# The frontend uses task-{uuid.hex} format (no hyphens), matching OpenHandsUUID serialization
task_id_str = f'task-{task_id.hex}'
# conversation_id uses standard format (with hyphens) for agent server API compatibility
conversation_id_str = str(conversation_id)
_logger.info(f'task_id={task_id_str} conversation_id={conversation_id_str}')
# First, update any messages that were queued with the task_id
updated_count = await self.pending_message_service.update_conversation_id(
old_conversation_id=task_id_str,
new_conversation_id=conversation_id_str,
)
_logger.info(f'updated_count={updated_count} ')
if updated_count > 0:
_logger.info(
f'Updated {updated_count} pending messages from task_id={task_id_str} '
f'to conversation_id={conversation_id_str}'
)
# Get all pending messages for this conversation
pending_messages = await self.pending_message_service.get_pending_messages(
conversation_id_str
)
if not pending_messages:
return
_logger.info(
f'Processing {len(pending_messages)} pending messages for '
f'conversation {conversation_id_str}'
)
# Process messages sequentially to preserve order
for msg in pending_messages:
try:
# Serialize content objects to JSON-compatible dicts
content_json = [item.model_dump() for item in msg.content]
# Use the events endpoint which handles message sending
response = await self.httpx_client.post(
f'{agent_server_url}/api/conversations/{conversation_id_str}/events',
json={
'role': msg.role,
'content': content_json,
'run': True,
},
headers={'X-Session-API-Key': session_api_key},
timeout=30.0,
)
response.raise_for_status()
_logger.debug(f'Delivered pending message {msg.id}')
except Exception as e:
_logger.warning(f'Failed to deliver pending message {msg.id}: {e}')
# Delete all pending messages after processing (regardless of success/failure)
deleted_count = (
await self.pending_message_service.delete_messages_for_conversation(
conversation_id_str
)
)
_logger.info(
f'Finished processing pending messages for conversation {conversation_id_str}. '
f'Deleted {deleted_count} messages.'
)
async def update_agent_server_conversation_title(
self,
conversation_id: str,
@@ -1604,6 +1939,10 @@ class LiveStatusAppConversationServiceInjector(AppConversationServiceInjector):
sandbox_startup_poll_frequency: int = Field(
default=2, description='The frequency to poll for sandbox readiness'
)
max_num_conversations_per_sandbox: int = Field(
default=20,
description='The maximum number of conversations allowed per sandbox',
)
init_git_in_empty_workspace: bool = Field(
default=True,
description='Whether to initialize a git repo when the workspace is empty',
@@ -1630,6 +1969,7 @@ class LiveStatusAppConversationServiceInjector(AppConversationServiceInjector):
get_global_config,
get_httpx_client,
get_jwt_service,
get_pending_message_service,
get_sandbox_service,
get_sandbox_spec_service,
get_user_context,
@@ -1649,6 +1989,7 @@ class LiveStatusAppConversationServiceInjector(AppConversationServiceInjector):
get_event_service(state, request) as event_service,
get_jwt_service(state, request) as jwt_service,
get_httpx_client(state, request) as httpx_client,
get_pending_message_service(state, request) as pending_message_service,
):
access_token_hard_timeout = None
if self.access_token_hard_timeout:
@@ -1693,8 +2034,10 @@ class LiveStatusAppConversationServiceInjector(AppConversationServiceInjector):
event_callback_service=event_callback_service,
event_service=event_service,
jwt_service=jwt_service,
pending_message_service=pending_message_service,
sandbox_startup_timeout=self.sandbox_startup_timeout,
sandbox_startup_poll_frequency=self.sandbox_startup_poll_frequency,
max_num_conversations_per_sandbox=self.max_num_conversations_per_sandbox,
httpx_client=httpx_client,
web_url=web_url,
openhands_provider_base_url=config.openhands_provider_base_url,

View File

@@ -33,21 +33,6 @@ class ExposedUrlConfig(BaseModel):
port: int
WORK_HOSTS_SKILL_FOOTER = """
When starting a web server, use the corresponding ports via environment variables:
- $WORKER_1 for the first port
- $WORKER_2 for the second port
**CRITICAL: You MUST enable CORS and bind to 0.0.0.0.** Without CORS headers, the App tab cannot detect your server and will show an empty state.
Example (Flask):
```python
from flask_cors import CORS
CORS(app)
app.run(host='0.0.0.0', port=int(os.environ.get('WORKER_1', 12000)))
```"""
class SandboxConfig(BaseModel):
"""Sandbox configuration for agent-server API request."""

View File

@@ -119,6 +119,7 @@ class SQLAppConversationInfoService(AppConversationInfoService):
created_at__lt: datetime | None = None,
updated_at__gte: datetime | None = None,
updated_at__lt: datetime | None = None,
sandbox_id__eq: str | None = None,
sort_order: AppConversationSortOrder = AppConversationSortOrder.CREATED_AT_DESC,
page_id: str | None = None,
limit: int = 100,
@@ -141,6 +142,7 @@ class SQLAppConversationInfoService(AppConversationInfoService):
created_at__lt=created_at__lt,
updated_at__gte=updated_at__gte,
updated_at__lt=updated_at__lt,
sandbox_id__eq=sandbox_id__eq,
)
# Add sort order
@@ -195,6 +197,7 @@ class SQLAppConversationInfoService(AppConversationInfoService):
created_at__lt: datetime | None = None,
updated_at__gte: datetime | None = None,
updated_at__lt: datetime | None = None,
sandbox_id__eq: str | None = None,
) -> int:
"""Count sandboxed conversations matching the given filters."""
query = select(func.count(StoredConversationMetadata.conversation_id)).where(
@@ -208,6 +211,7 @@ class SQLAppConversationInfoService(AppConversationInfoService):
created_at__lt=created_at__lt,
updated_at__gte=updated_at__gte,
updated_at__lt=updated_at__lt,
sandbox_id__eq=sandbox_id__eq,
)
result = await self.db_session.execute(query)
@@ -222,6 +226,7 @@ class SQLAppConversationInfoService(AppConversationInfoService):
created_at__lt: datetime | None = None,
updated_at__gte: datetime | None = None,
updated_at__lt: datetime | None = None,
sandbox_id__eq: str | None = None,
) -> Select:
# Apply the same filters as search_app_conversations
conditions = []
@@ -246,6 +251,9 @@ class SQLAppConversationInfoService(AppConversationInfoService):
StoredConversationMetadata.last_updated_at < updated_at__lt
)
if sandbox_id__eq is not None:
conditions.append(StoredConversationMetadata.sandbox_id == sandbox_id__eq)
if conditions:
query = query.where(*conditions)
return query

View File

@@ -0,0 +1,39 @@
"""Add pending_messages table for server-side message queuing
Revision ID: 007
Revises: 006
Create Date: 2025-03-15 00:00:00.000000
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = '007'
down_revision: Union[str, None] = '006'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Create pending_messages table for storing messages before conversation is ready.
Messages are stored temporarily until the conversation becomes ready, then
delivered and deleted regardless of success or failure.
"""
op.create_table(
'pending_messages',
sa.Column('id', sa.String(), primary_key=True),
sa.Column('conversation_id', sa.String(), nullable=False, index=True),
sa.Column('role', sa.String(20), nullable=False, server_default='user'),
sa.Column('content', sa.JSON, nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), nullable=False),
)
def downgrade() -> None:
"""Remove pending_messages table."""
op.drop_table('pending_messages')

View File

@@ -33,6 +33,10 @@ from openhands.app_server.event_callback.event_callback_service import (
EventCallbackService,
EventCallbackServiceInjector,
)
from openhands.app_server.pending_messages.pending_message_service import (
PendingMessageService,
PendingMessageServiceInjector,
)
from openhands.app_server.sandbox.sandbox_service import (
SandboxService,
SandboxServiceInjector,
@@ -56,6 +60,7 @@ from openhands.app_server.web_client.web_client_config_injector import (
)
from openhands.sdk.utils.models import OpenHandsModel
from openhands.server.types import AppMode
from openhands.utils.environment import StorageProvider, get_storage_provider
def get_default_persistence_dir() -> Path:
@@ -113,6 +118,7 @@ class AppServerConfig(OpenHandsModel):
app_conversation_info: AppConversationInfoServiceInjector | None = None
app_conversation_start_task: AppConversationStartTaskServiceInjector | None = None
app_conversation: AppConversationServiceInjector | None = None
pending_message: PendingMessageServiceInjector | None = None
user: UserContextInjector | None = None
jwt: JwtServiceInjector | None = None
httpx: HttpxClientInjector = Field(default_factory=HttpxClientInjector)
@@ -140,6 +146,9 @@ def config_from_env() -> AppServerConfig:
from openhands.app_server.app_conversation.sql_app_conversation_start_task_service import ( # noqa: E501
SQLAppConversationStartTaskServiceInjector,
)
from openhands.app_server.event.aws_event_service import (
AwsEventServiceInjector,
)
from openhands.app_server.event.filesystem_event_service import (
FilesystemEventServiceInjector,
)
@@ -174,8 +183,18 @@ def config_from_env() -> AppServerConfig:
config: AppServerConfig = from_env(AppServerConfig, 'OH') # type: ignore
if config.event is None:
if os.environ.get('FILE_STORE') == 'google_cloud':
# Legacy V0 google cloud storage configuration
provider = get_storage_provider()
if provider == StorageProvider.AWS:
# AWS S3 storage configuration
bucket_name = os.environ.get('FILE_STORE_PATH')
if not bucket_name:
raise ValueError(
'FILE_STORE_PATH environment variable is required for S3 storage'
)
config.event = AwsEventServiceInjector(bucket_name=bucket_name)
elif provider == StorageProvider.GCP:
# Google Cloud storage configuration
config.event = GoogleCloudEventServiceInjector(
bucket_name=os.environ.get('FILE_STORE_PATH')
)
@@ -266,6 +285,13 @@ def config_from_env() -> AppServerConfig:
tavily_api_key=tavily_api_key
)
if config.pending_message is None:
from openhands.app_server.pending_messages.pending_message_service import (
SQLPendingMessageServiceInjector,
)
config.pending_message = SQLPendingMessageServiceInjector()
if config.user is None:
config.user = AuthUserContextInjector()
@@ -344,6 +370,14 @@ def get_app_conversation_service(
return injector.context(state, request)
def get_pending_message_service(
state: InjectorState, request: Request | None = None
) -> AsyncContextManager[PendingMessageService]:
injector = get_global_config().pending_message
assert injector is not None
return injector.context(state, request)
def get_user_context(
state: InjectorState, request: Request | None = None
) -> AsyncContextManager[UserContext]:
@@ -419,6 +453,12 @@ def depends_app_conversation_service():
return Depends(injector.depends)
def depends_pending_message_service():
injector = get_global_config().pending_message
assert injector is not None
return Depends(injector.depends)
def depends_user_context():
injector = get_global_config().user
assert injector is not None

View File

@@ -0,0 +1,113 @@
"""AWS S3-based EventService implementation.
This implementation uses role-based authentication (no credentials needed).
"""
import json
import logging
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Any, AsyncGenerator
import boto3
import botocore.exceptions
from fastapi import Request
from openhands.app_server.config import get_app_conversation_info_service
from openhands.app_server.event.event_service import EventService, EventServiceInjector
from openhands.app_server.event.event_service_base import EventServiceBase
from openhands.app_server.services.injector import InjectorState
from openhands.sdk import Event
_logger = logging.getLogger(__name__)
@dataclass
class AwsEventService(EventServiceBase):
"""AWS S3-based implementation of EventService.
Uses role-based authentication, so no explicit credentials are needed.
"""
s3_client: Any
bucket_name: str
def _load_event(self, path: Path) -> Event | None:
"""Get the event at the path given."""
try:
response = self.s3_client.get_object(Bucket=self.bucket_name, Key=str(path))
with response['Body'] as stream:
json_data = stream.read().decode('utf-8')
event = Event.model_validate_json(json_data)
return event
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
return None
_logger.exception(f'Error reading event from {path}')
return None
except Exception:
_logger.exception(f'Error reading event from {path}')
return None
def _store_event(self, path: Path, event: Event):
"""Store the event given at the path given."""
data = event.model_dump(mode='json')
json_str = json.dumps(data, indent=2)
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=str(path),
Body=json_str.encode('utf-8'),
)
def _search_paths(self, prefix: Path, page_id: str | None = None) -> list[Path]:
"""Search paths."""
kwargs: dict[str, Any] = {
'Bucket': self.bucket_name,
'Prefix': str(prefix),
}
if page_id:
kwargs['ContinuationToken'] = page_id
response = self.s3_client.list_objects_v2(**kwargs)
contents = response.get('Contents', [])
paths = [Path(obj['Key']) for obj in contents]
return paths
class AwsEventServiceInjector(EventServiceInjector):
bucket_name: str
prefix: Path = Path('users')
async def inject(
self, state: InjectorState, request: Request | None = None
) -> AsyncGenerator[EventService, None]:
from openhands.app_server.config import (
get_user_context,
)
async with (
get_user_context(state, request) as user_context,
get_app_conversation_info_service(
state, request
) as app_conversation_info_service,
):
user_id = await user_context.get_user_id()
bucket_name = self.bucket_name
# Use role-based authentication - boto3 will automatically
# use IAM role credentials when running in AWS
s3_client = boto3.client(
's3',
endpoint_url=os.getenv('AWS_S3_ENDPOINT'),
)
yield AwsEventService(
prefix=self.prefix,
user_id=user_id,
app_conversation_info_service=app_conversation_info_service,
s3_client=s3_client,
bucket_name=bucket_name,
app_conversation_info_load_tasks={},
)

View File

@@ -106,14 +106,15 @@ class EventServiceBase(EventService, ABC):
reverse=(sort_order == EventSortOrder.TIMESTAMP_DESC),
)
# Apply pagination to items (not paths)
start_offset = 0
next_page_id = None
if page_id:
start_offset = int(page_id)
paths = paths[start_offset:]
if len(paths) > limit:
paths = paths[:limit]
items = items[start_offset:]
if len(items) > limit:
next_page_id = str(start_offset + limit)
items = items[:limit]
return EventPage(items=items, next_page_id=next_page_id)

View File

@@ -1,6 +1,9 @@
import asyncio
import logging
from uuid import UUID
import httpx
from openhands.app_server.app_conversation.app_conversation_models import (
AppConversationInfo,
)
@@ -22,6 +25,9 @@ from openhands.sdk import Event, MessageEvent
_logger = logging.getLogger(__name__)
# Poll with ~3.75s total wait per message event before retrying later.
_TITLE_POLL_DELAYS_S = (0.25, 0.5, 1.0, 2.0)
class SetTitleCallbackProcessor(EventCallbackProcessor):
"""Callback processor which sets conversation titles."""
@@ -51,7 +57,6 @@ class SetTitleCallbackProcessor(EventCallbackProcessor):
get_app_conversation_info_service(state) as app_conversation_info_service,
get_httpx_client(state) as httpx_client,
):
# Generate a title for the conversation
app_conversation = await app_conversation_service.get_app_conversation(
conversation_id
)
@@ -61,15 +66,38 @@ class SetTitleCallbackProcessor(EventCallbackProcessor):
app_conversation_url = replace_localhost_hostname_for_docker(
app_conversation_url
)
response = await httpx_client.post(
f'{app_conversation_url}/generate_title',
headers={
'X-Session-API-Key': app_conversation.session_api_key,
},
content='{}',
)
response.raise_for_status()
title = response.json()['title']
title = None
for delay_s in _TITLE_POLL_DELAYS_S:
try:
response = await httpx_client.get(
app_conversation_url,
headers={
'X-Session-API-Key': app_conversation.session_api_key,
},
)
response.raise_for_status()
except httpx.HTTPError as exc:
# Transient agent-server failures are acceptable; retry later.
_logger.debug(
'Title poll failed for conversation %s: %s',
conversation_id,
exc,
)
else:
title = response.json().get('title')
if title:
break
# Backoff applies to both missing-title responses and transient errors.
await asyncio.sleep(delay_s)
if not title:
# Keep the callback active so later message events can retry.
_logger.info(
f'Conversation {conversation_id} title not available yet; '
'will retry on a future message event.'
)
return None
# Save the conversation info
info = AppConversationInfo(

View File

@@ -6,7 +6,7 @@ import logging
import pkgutil
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Response, status
from fastapi import APIRouter, Depends, HTTPException, Request, Response, status
from fastapi.security import APIKeyHeader
from jwt import InvalidTokenError
from pydantic import SecretStr
@@ -23,61 +23,87 @@ from openhands.app_server.config import (
depends_app_conversation_info_service,
depends_event_service,
depends_jwt_service,
depends_sandbox_service,
get_event_callback_service,
get_global_config,
get_sandbox_service,
)
from openhands.app_server.errors import AuthError
from openhands.app_server.event.event_service import EventService
from openhands.app_server.sandbox.sandbox_models import SandboxInfo
from openhands.app_server.sandbox.sandbox_service import SandboxService
from openhands.app_server.services.injector import InjectorState
from openhands.app_server.services.jwt_service import JwtService
from openhands.app_server.user.auth_user_context import AuthUserContext
from openhands.app_server.user.specifiy_user_context import (
ADMIN,
USER_CONTEXT_ATTR,
SpecifyUserContext,
as_admin,
)
from openhands.app_server.user.user_context import UserContext
from openhands.integrations.provider import ProviderType
from openhands.sdk import ConversationExecutionStatus, Event
from openhands.sdk.event import ConversationStateUpdateEvent
from openhands.server.types import AppMode
from openhands.server.user_auth.default_user_auth import DefaultUserAuth
from openhands.server.user_auth.user_auth import (
get_for_user as get_user_auth_for_user,
)
router = APIRouter(prefix='/webhooks', tags=['Webhooks'])
sandbox_service_dependency = depends_sandbox_service()
event_service_dependency = depends_event_service()
app_conversation_info_service_dependency = depends_app_conversation_info_service()
jwt_dependency = depends_jwt_service()
app_mode = get_global_config().app_mode
_logger = logging.getLogger(__name__)
async def valid_sandbox(
user_context: UserContext = Depends(as_admin),
request: Request,
session_api_key: str = Depends(
APIKeyHeader(name='X-Session-API-Key', auto_error=False)
),
sandbox_service: SandboxService = sandbox_service_dependency,
) -> SandboxInfo:
"""Use a session api key for validation, and get a sandbox. Subsequent actions
are executed in the context of the owner of the sandbox"""
if not session_api_key:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED, detail='X-Session-API-Key header is required'
)
sandbox_info = await sandbox_service.get_sandbox_by_session_api_key(session_api_key)
if sandbox_info is None:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED, detail='Invalid session API key'
# Create a state which will be used internally only for this operation
state = InjectorState()
# Since we need access to all sandboxes, this is executed in the context of the admin.
setattr(state, USER_CONTEXT_ATTR, ADMIN)
async with get_sandbox_service(state) as sandbox_service:
sandbox_info = await sandbox_service.get_sandbox_by_session_api_key(
session_api_key
)
return sandbox_info
if sandbox_info is None:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED, detail='Invalid session API key'
)
# In SAAS Mode there is always a user, so we set the owner of the sandbox
# as the current user (Validated by the session_api_key they provided)
if sandbox_info.created_by_user_id:
setattr(
request.state,
USER_CONTEXT_ATTR,
SpecifyUserContext(sandbox_info.created_by_user_id),
)
elif app_mode == AppMode.SAAS:
_logger.error(
'Sandbox had no user specified', extra={'sandbox_id': sandbox_info.id}
)
raise HTTPException(
status.HTTP_401_UNAUTHORIZED, detail='Sandbox had no user specified'
)
return sandbox_info
async def valid_conversation(
conversation_id: UUID,
sandbox_info: SandboxInfo,
sandbox_info: SandboxInfo = Depends(valid_sandbox),
app_conversation_info_service: AppConversationInfoService = app_conversation_info_service_dependency,
) -> AppConversationInfo:
app_conversation_info = (
@@ -90,9 +116,11 @@ async def valid_conversation(
sandbox_id=sandbox_info.id,
created_by_user_id=sandbox_info.created_by_user_id,
)
# Sanity check - Make sure that the conversation and sandbox were created by the same user
if app_conversation_info.created_by_user_id != sandbox_info.created_by_user_id:
# Make sure that the conversation and sandbox were created by the same user
raise AuthError()
return app_conversation_info
@@ -139,15 +167,11 @@ async def on_conversation_update(
async def on_event(
events: list[Event],
conversation_id: UUID,
sandbox_info: SandboxInfo = Depends(valid_sandbox),
app_conversation_info: AppConversationInfo = Depends(valid_conversation),
app_conversation_info_service: AppConversationInfoService = app_conversation_info_service_dependency,
event_service: EventService = event_service_dependency,
) -> Success:
"""Webhook callback for when event stream events occur."""
app_conversation_info = await valid_conversation(
conversation_id, sandbox_info, app_conversation_info_service
)
try:
# Save events...
await asyncio.gather(

View File

@@ -0,0 +1,21 @@
"""Pending messages module for server-side message queuing."""
from openhands.app_server.pending_messages.pending_message_models import (
PendingMessage,
PendingMessageResponse,
)
from openhands.app_server.pending_messages.pending_message_service import (
PendingMessageService,
PendingMessageServiceInjector,
SQLPendingMessageService,
SQLPendingMessageServiceInjector,
)
__all__ = [
'PendingMessage',
'PendingMessageResponse',
'PendingMessageService',
'PendingMessageServiceInjector',
'SQLPendingMessageService',
'SQLPendingMessageServiceInjector',
]

View File

@@ -0,0 +1,32 @@
"""Models for pending message queue functionality."""
from datetime import datetime
from uuid import uuid4
from pydantic import BaseModel, Field
from openhands.agent_server.models import ImageContent, TextContent
from openhands.agent_server.utils import utc_now
class PendingMessage(BaseModel):
"""A message queued for delivery when conversation becomes ready.
Pending messages are stored in the database and delivered to the agent_server
when the conversation transitions to READY status. Messages are deleted after
processing, regardless of success or failure.
"""
id: str = Field(default_factory=lambda: str(uuid4()))
conversation_id: str # Can be task-{uuid} or real conversation UUID
role: str = 'user'
content: list[TextContent | ImageContent]
created_at: datetime = Field(default_factory=utc_now)
class PendingMessageResponse(BaseModel):
"""Response when queueing a pending message."""
id: str
queued: bool
position: int = Field(description='Position in the queue (1-based)')

View File

@@ -0,0 +1,104 @@
"""REST API router for pending messages."""
import logging
from fastapi import APIRouter, HTTPException, Request, status
from pydantic import TypeAdapter, ValidationError
from openhands.agent_server.models import ImageContent, TextContent
from openhands.app_server.config import depends_pending_message_service
from openhands.app_server.pending_messages.pending_message_models import (
PendingMessageResponse,
)
from openhands.app_server.pending_messages.pending_message_service import (
PendingMessageService,
)
from openhands.server.dependencies import get_dependencies
logger = logging.getLogger(__name__)
# Type adapter for validating content from request
_content_type_adapter = TypeAdapter(list[TextContent | ImageContent])
# Create router with authentication dependencies
router = APIRouter(
prefix='/conversations/{conversation_id}/pending-messages',
tags=['Pending Messages'],
dependencies=get_dependencies(),
)
# Create dependency at module level
pending_message_service_dependency = depends_pending_message_service()
@router.post(
'', response_model=PendingMessageResponse, status_code=status.HTTP_201_CREATED
)
async def queue_pending_message(
conversation_id: str,
request: Request,
pending_service: PendingMessageService = pending_message_service_dependency,
) -> PendingMessageResponse:
"""Queue a message for delivery when conversation becomes ready.
This endpoint allows users to submit messages even when the conversation's
WebSocket connection is not yet established. Messages are stored server-side
and delivered automatically when the conversation transitions to READY status.
Args:
conversation_id: The conversation ID (can be task ID before conversation is ready)
request: The FastAPI request containing message content
Returns:
PendingMessageResponse with the message ID and queue position
Raises:
HTTPException 400: If the request body is invalid
HTTPException 429: If too many pending messages are queued (limit: 10)
"""
try:
body = await request.json()
except Exception:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='Invalid request body',
)
raw_content = body.get('content')
role = body.get('role', 'user')
if not raw_content or not isinstance(raw_content, list):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail='content must be a non-empty list',
)
# Validate and parse content into typed objects
try:
content = _content_type_adapter.validate_python(raw_content)
except ValidationError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f'Invalid content format: {e}',
)
# Rate limit: max 10 pending messages per conversation
pending_count = await pending_service.count_pending_messages(conversation_id)
if pending_count >= 10:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail='Too many pending messages. Maximum 10 messages per conversation.',
)
response = await pending_service.add_message(
conversation_id=conversation_id,
content=content,
role=role,
)
logger.info(
f'Queued pending message {response.id} for conversation {conversation_id} '
f'(position: {response.position})'
)
return response

View File

@@ -0,0 +1,200 @@
"""Service for managing pending messages in SQL database."""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import AsyncGenerator
from fastapi import Request
from pydantic import TypeAdapter
from sqlalchemy import JSON, Column, String, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from openhands.agent_server.models import ImageContent, TextContent
from openhands.app_server.pending_messages.pending_message_models import (
PendingMessage,
PendingMessageResponse,
)
from openhands.app_server.services.injector import Injector, InjectorState
from openhands.app_server.utils.sql_utils import Base, UtcDateTime
from openhands.sdk.utils.models import DiscriminatedUnionMixin
# Type adapter for deserializing content from JSON
_content_type_adapter = TypeAdapter(list[TextContent | ImageContent])
class StoredPendingMessage(Base): # type: ignore
"""SQLAlchemy model for pending messages."""
__tablename__ = 'pending_messages'
id = Column(String, primary_key=True)
conversation_id = Column(String, nullable=False, index=True)
role = Column(String(20), nullable=False, default='user')
content = Column(JSON, nullable=False)
created_at = Column(UtcDateTime, server_default=func.now(), index=True)
class PendingMessageService(ABC):
"""Abstract service for managing pending messages."""
@abstractmethod
async def add_message(
self,
conversation_id: str,
content: list[TextContent | ImageContent],
role: str = 'user',
) -> PendingMessageResponse:
"""Queue a message for delivery when conversation becomes ready."""
@abstractmethod
async def get_pending_messages(self, conversation_id: str) -> list[PendingMessage]:
"""Get all pending messages for a conversation, ordered by created_at."""
@abstractmethod
async def count_pending_messages(self, conversation_id: str) -> int:
"""Count pending messages for a conversation."""
@abstractmethod
async def delete_messages_for_conversation(self, conversation_id: str) -> int:
"""Delete all pending messages for a conversation, returning count deleted."""
@abstractmethod
async def update_conversation_id(
self, old_conversation_id: str, new_conversation_id: str
) -> int:
"""Update conversation_id when task-id transitions to real conversation-id.
Returns the number of messages updated.
"""
@dataclass
class SQLPendingMessageService(PendingMessageService):
"""SQL implementation of PendingMessageService."""
db_session: AsyncSession
async def add_message(
self,
conversation_id: str,
content: list[TextContent | ImageContent],
role: str = 'user',
) -> PendingMessageResponse:
"""Queue a message for delivery when conversation becomes ready."""
# Create the pending message
pending_message = PendingMessage(
conversation_id=conversation_id,
role=role,
content=content,
)
# Count existing pending messages for position
count_stmt = select(func.count()).where(
StoredPendingMessage.conversation_id == conversation_id
)
result = await self.db_session.execute(count_stmt)
position = result.scalar() or 0
# Serialize content to JSON-compatible format for storage
content_json = [item.model_dump() for item in content]
# Store in database
stored_message = StoredPendingMessage(
id=str(pending_message.id),
conversation_id=conversation_id,
role=role,
content=content_json,
created_at=pending_message.created_at,
)
self.db_session.add(stored_message)
await self.db_session.commit()
return PendingMessageResponse(
id=pending_message.id,
queued=True,
position=position + 1,
)
async def get_pending_messages(self, conversation_id: str) -> list[PendingMessage]:
"""Get all pending messages for a conversation, ordered by created_at."""
stmt = (
select(StoredPendingMessage)
.where(StoredPendingMessage.conversation_id == conversation_id)
.order_by(StoredPendingMessage.created_at.asc())
)
result = await self.db_session.execute(stmt)
stored_messages = result.scalars().all()
return [
PendingMessage(
id=msg.id,
conversation_id=msg.conversation_id,
role=msg.role,
content=_content_type_adapter.validate_python(msg.content),
created_at=msg.created_at,
)
for msg in stored_messages
]
async def count_pending_messages(self, conversation_id: str) -> int:
"""Count pending messages for a conversation."""
count_stmt = select(func.count()).where(
StoredPendingMessage.conversation_id == conversation_id
)
result = await self.db_session.execute(count_stmt)
return result.scalar() or 0
async def delete_messages_for_conversation(self, conversation_id: str) -> int:
"""Delete all pending messages for a conversation, returning count deleted."""
stmt = select(StoredPendingMessage).where(
StoredPendingMessage.conversation_id == conversation_id
)
result = await self.db_session.execute(stmt)
stored_messages = result.scalars().all()
count = len(stored_messages)
for msg in stored_messages:
await self.db_session.delete(msg)
if count > 0:
await self.db_session.commit()
return count
async def update_conversation_id(
self, old_conversation_id: str, new_conversation_id: str
) -> int:
"""Update conversation_id when task-id transitions to real conversation-id."""
stmt = select(StoredPendingMessage).where(
StoredPendingMessage.conversation_id == old_conversation_id
)
result = await self.db_session.execute(stmt)
stored_messages = result.scalars().all()
count = len(stored_messages)
for msg in stored_messages:
msg.conversation_id = new_conversation_id
if count > 0:
await self.db_session.commit()
return count
class PendingMessageServiceInjector(
DiscriminatedUnionMixin, Injector[PendingMessageService], ABC
):
"""Abstract injector for PendingMessageService."""
pass
class SQLPendingMessageServiceInjector(PendingMessageServiceInjector):
"""SQL-based injector for PendingMessageService."""
async def inject(
self, state: InjectorState, request: Request | None = None
) -> AsyncGenerator[PendingMessageService, None]:
from openhands.app_server.config import get_db_session
async with get_db_session(state) as db_session:
yield SQLPendingMessageService(db_session=db_session)

View File

@@ -43,6 +43,16 @@ _logger = logging.getLogger(__name__)
STARTUP_GRACE_SECONDS = 15
def _get_use_host_network_default() -> bool:
"""Get the default value for use_host_network from environment variables.
This function is called at runtime (not at class definition time) to ensure
that environment variable changes are picked up correctly.
"""
value = os.getenv('AGENT_SERVER_USE_HOST_NETWORK', '')
return value.lower() in ('true', '1', 'yes')
class VolumeMount(BaseModel):
"""Mounted volume within the container."""
@@ -197,6 +207,12 @@ class DockerSandboxService(SandboxService):
)
)
if not container.image.tags:
_logger.debug(
f'Skipping container {container.name!r}: image has no tags (image id: {container.image.id})'
)
return None
return SandboxInfo(
id=container.name,
created_by_user_id=None,
@@ -585,18 +601,13 @@ class DockerSandboxServiceInjector(SandboxServiceInjector):
),
)
use_host_network: bool = Field(
default=os.getenv('SANDBOX_USE_HOST_NETWORK', '').lower()
in (
'true',
'1',
'yes',
),
default_factory=_get_use_host_network_default,
description=(
'Whether to use host networking mode for sandbox containers. '
'Whether to use host networking mode for agent-server containers. '
'When enabled, containers share the host network namespace, '
'making all container ports directly accessible on the host. '
'This is useful for reverse proxy setups where dynamic port mapping '
'is problematic. Configure via OH_SANDBOX_USE_HOST_NETWORK environment variable.'
'is problematic. Configure via AGENT_SERVER_USE_HOST_NETWORK environment variable.'
),
)

View File

@@ -53,14 +53,6 @@ from openhands.sdk.utils.paging import page_iterator
_logger = logging.getLogger(__name__)
polling_task: asyncio.Task | None = None
POD_STATUS_MAPPING = {
'ready': SandboxStatus.RUNNING,
'pending': SandboxStatus.STARTING,
'running': SandboxStatus.STARTING,
'failed': SandboxStatus.ERROR,
'unknown': SandboxStatus.ERROR,
'crashloopbackoff': SandboxStatus.ERROR,
}
STATUS_MAPPING = {
'running': SandboxStatus.RUNNING,
'paused': SandboxStatus.PAUSED,
@@ -188,28 +180,22 @@ class RemoteSandboxService(SandboxService):
def _get_sandbox_status_from_runtime(
self, runtime: dict[str, Any] | None
) -> SandboxStatus:
"""Derive a SandboxStatus from the runtime info. The legacy logic for getting
the status of a runtime is inconsistent. It is divided between a "status" which
cannot be trusted (It sometimes returns "running" for cases when the pod is
still starting) and a "pod_status" which is not returned for list
operations."""
"""Derive a SandboxStatus from the runtime info.
The status field is now the source of truth for sandbox status. It accounts
for both pod readiness and ingress availability, making it more reliable than
pod_status which only reflected pod state.
"""
if not runtime:
return SandboxStatus.MISSING
status = None
pod_status = (runtime.get('pod_status') or '').lower()
if pod_status:
status = POD_STATUS_MAPPING.get(pod_status, None)
runtime_status = runtime.get('status')
if runtime_status:
status = STATUS_MAPPING.get(runtime_status.lower(), None)
if status is not None:
return status
# If we failed to get the status from the pod status, fall back to status
if status is None:
runtime_status = runtime.get('status')
if runtime_status:
status = STATUS_MAPPING.get(runtime_status.lower(), None)
if status is None:
return SandboxStatus.MISSING
return status
return SandboxStatus.MISSING
async def _secure_select(self):
query = select(StoredRemoteSandbox)
@@ -514,9 +500,6 @@ class RemoteSandboxService(SandboxService):
session_api_key
)
# Hack - result doesn't contain this
runtime_data['pod_status'] = 'pending'
# Log runtime assignment for observability
runtime_id = runtime_data.get('runtime_id', 'unknown')
_logger.info(f'Started sandbox {sandbox_id} with runtime_id={runtime_id}')

View File

@@ -59,3 +59,20 @@ class SandboxInfo(BaseModel):
class SandboxPage(BaseModel):
items: list[SandboxInfo]
next_page_id: str | None = None
class SecretNameItem(BaseModel):
"""A secret's name and optional description (value NOT included)."""
name: str = Field(description='The secret name/key')
description: str | None = Field(
default=None, description='Optional description of the secret'
)
class SecretNamesResponse(BaseModel):
"""Response listing available secret names (no raw values)."""
secrets: list[SecretNameItem] = Field(
default_factory=list, description='Available secrets'
)

View File

@@ -1,16 +1,30 @@
"""Runtime Containers router for OpenHands App Server."""
from typing import Annotated
import logging
from typing import Annotated, cast
from fastapi import APIRouter, HTTPException, Query, status
from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response, status
from fastapi.security import APIKeyHeader
from openhands.agent_server.models import Success
from openhands.app_server.config import depends_sandbox_service
from openhands.app_server.sandbox.sandbox_models import SandboxInfo, SandboxPage
from openhands.app_server.sandbox.sandbox_models import (
SandboxInfo,
SandboxPage,
SecretNameItem,
SecretNamesResponse,
)
from openhands.app_server.sandbox.sandbox_service import (
SandboxService,
)
from openhands.app_server.sandbox.session_auth import validate_session_key
from openhands.app_server.user.auth_user_context import AuthUserContext
from openhands.server.dependencies import get_dependencies
from openhands.server.user_auth.user_auth import (
get_for_user as get_user_auth_for_user,
)
_logger = logging.getLogger(__name__)
# We use the get_dependencies method here to signal to the OpenAPI docs that this endpoint
# is protected. The actual protection is provided by SetAuthCookieMiddleware
@@ -94,3 +108,104 @@ async def delete_sandbox(
if not exists:
raise HTTPException(status.HTTP_404_NOT_FOUND)
return Success()
# ---------------------------------------------------------------------------
# Sandbox-scoped secrets (authenticated via X-Session-API-Key)
# ---------------------------------------------------------------------------
async def _valid_sandbox_from_session_key(
request: Request,
sandbox_id: str,
session_api_key: str = Depends(
APIKeyHeader(name='X-Session-API-Key', auto_error=False)
),
) -> SandboxInfo:
"""Authenticate via ``X-Session-API-Key`` and verify sandbox ownership."""
sandbox_info = await validate_session_key(session_api_key)
if sandbox_info.id != sandbox_id:
raise HTTPException(
status.HTTP_403_FORBIDDEN,
detail='Session API key does not match sandbox',
)
return sandbox_info
async def _get_user_context(sandbox_info: SandboxInfo) -> AuthUserContext:
"""Build an ``AuthUserContext`` for the sandbox owner."""
if not sandbox_info.created_by_user_id:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED,
detail='Sandbox has no associated user',
)
user_auth = await get_user_auth_for_user(sandbox_info.created_by_user_id)
return AuthUserContext(user_auth=user_auth)
@router.get('/{sandbox_id}/settings/secrets')
async def list_secret_names(
sandbox_info: SandboxInfo = Depends(_valid_sandbox_from_session_key),
) -> SecretNamesResponse:
"""List available secret names (no raw values).
Includes both custom secrets and provider tokens (e.g. github_token).
"""
user_context = await _get_user_context(sandbox_info)
items: list[SecretNameItem] = []
# Custom secrets
secret_sources = await user_context.get_secrets()
for name, source in secret_sources.items():
items.append(SecretNameItem(name=name, description=source.description))
# Provider tokens (github_token, gitlab_token, etc.)
provider_env_vars = cast(
dict[str, str] | None,
await user_context.get_provider_tokens(as_env_vars=True),
)
if provider_env_vars:
for env_key in provider_env_vars:
items.append(
SecretNameItem(name=env_key, description=f'{env_key} provider token')
)
return SecretNamesResponse(secrets=items)
@router.get('/{sandbox_id}/settings/secrets/{secret_name}')
async def get_secret_value(
secret_name: str,
sandbox_info: SandboxInfo = Depends(_valid_sandbox_from_session_key),
) -> Response:
"""Return a single secret value as plain text.
Called by ``LookupSecret`` inside the sandbox. Checks custom secrets
first, then falls back to provider tokens — always resolving the
latest token at call time.
"""
user_context = await _get_user_context(sandbox_info)
# Check custom secrets first
secret_sources = await user_context.get_secrets()
source = secret_sources.get(secret_name)
if source is not None:
value = source.get_value()
if value is None:
raise HTTPException(status.HTTP_404_NOT_FOUND, detail='Secret has no value')
return Response(content=value, media_type='text/plain')
# Fall back to provider tokens (resolved fresh per request)
provider_env_vars = cast(
dict[str, str] | None,
await user_context.get_provider_tokens(as_env_vars=True),
)
if provider_env_vars:
token_value = provider_env_vars.get(secret_name)
if token_value is not None:
return Response(content=token_value, media_type='text/plain')
raise HTTPException(status.HTTP_404_NOT_FOUND, detail='Secret not found')

View File

@@ -13,7 +13,7 @@ from openhands.sdk.utils.models import DiscriminatedUnionMixin
# The version of the agent server to use for deployments.
# Typically this will be the same as the values from the pyproject.toml
AGENT_SERVER_IMAGE = 'ghcr.io/openhands/agent-server:2c1e72a-python'
AGENT_SERVER_IMAGE = 'ghcr.io/openhands/agent-server:1.14.0-python'
class SandboxSpecService(ABC):
@@ -69,27 +69,58 @@ def get_agent_server_image() -> str:
return AGENT_SERVER_IMAGE
# Prefixes for environment variables that should be auto-forwarded to agent-server
# These are typically configuration variables that affect the agent's behavior
AUTO_FORWARD_PREFIXES = ('LLM_',)
def get_agent_server_env() -> dict[str, str]:
"""Get environment variables to be injected into agent server sandbox environments.
This function reads environment variable overrides from the OH_AGENT_SERVER_ENV
environment variable, which should contain a JSON string mapping variable names
to their values.
This function combines two sources of environment variables:
1. **Auto-forwarded variables**: Environment variables with certain prefixes
(e.g., LLM_*) are automatically forwarded to the agent-server container.
This ensures that LLM configuration like timeouts and retry settings
work correctly in the two-container V1 architecture.
2. **Explicit overrides via OH_AGENT_SERVER_ENV**: A JSON string that allows
setting arbitrary environment variables in the agent-server container.
Values set here take precedence over auto-forwarded variables.
Auto-forwarded prefixes:
- LLM_* : LLM configuration (timeout, retries, model settings, etc.)
Usage:
Set OH_AGENT_SERVER_ENV to a JSON string:
OH_AGENT_SERVER_ENV='{"DEBUG": "true", "LOG_LEVEL": "info", "CUSTOM_VAR": "value"}'
# Auto-forwarding (no action needed):
export LLM_TIMEOUT=3600
export LLM_NUM_RETRIES=10
# These will automatically be available in the agent-server
This will inject the following environment variables into all sandbox environments:
- DEBUG=true
- LOG_LEVEL=info
- CUSTOM_VAR=value
# Explicit override via JSON:
OH_AGENT_SERVER_ENV='{"DEBUG": "true", "CUSTOM_VAR": "value"}'
# Override an auto-forwarded variable:
export LLM_TIMEOUT=3600 # Would be auto-forwarded as 3600
OH_AGENT_SERVER_ENV='{"LLM_TIMEOUT": "7200"}' # Overrides to 7200
Returns:
dict[str, str]: Dictionary of environment variable names to values.
Returns empty dict if OH_AGENT_SERVER_ENV is not set or invalid.
Returns empty dict if no variables are found.
Raises:
JSONDecodeError: If OH_AGENT_SERVER_ENV contains invalid JSON.
"""
return env_parser.from_env(dict[str, str], 'OH_AGENT_SERVER_ENV')
result: dict[str, str] = {}
# Step 1: Auto-forward environment variables with recognized prefixes
for key, value in os.environ.items():
if any(key.startswith(prefix) for prefix in AUTO_FORWARD_PREFIXES):
result[key] = value
# Step 2: Apply explicit overrides from OH_AGENT_SERVER_ENV
# These take precedence over auto-forwarded variables
explicit_env = env_parser.from_env(dict[str, str], 'OH_AGENT_SERVER_ENV')
result.update(explicit_env)
return result

View File

@@ -0,0 +1,66 @@
"""Shared session-key authentication for sandbox-scoped endpoints.
Both the sandbox router and the user router need to validate
``X-Session-API-Key`` headers. This module centralises that logic so
it lives in exactly one place.
The ``InjectorState`` + ``ADMIN`` pattern used here is established in
``webhook_router.py`` — the sandbox service requires an admin context to
look up sandboxes across all users by session key, but the session key
itself acts as the proof of access.
"""
import logging
from fastapi import HTTPException, status
from openhands.app_server.config import get_global_config, get_sandbox_service
from openhands.app_server.sandbox.sandbox_models import SandboxInfo
from openhands.app_server.services.injector import InjectorState
from openhands.app_server.user.specifiy_user_context import ADMIN, USER_CONTEXT_ATTR
from openhands.server.types import AppMode
_logger = logging.getLogger(__name__)
async def validate_session_key(session_api_key: str | None) -> SandboxInfo:
"""Validate an ``X-Session-API-Key`` and return the associated sandbox.
Raises:
HTTPException(401): if the key is missing or does not map to a sandbox.
HTTPException(401): in SAAS mode if the sandbox has no owning user.
"""
if not session_api_key:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED,
detail='X-Session-API-Key header is required',
)
# The sandbox service is scoped to users. To look up a sandbox by session
# key (which could belong to *any* user) we need an admin context. This
# is the same pattern used in webhook_router.valid_sandbox().
state = InjectorState()
setattr(state, USER_CONTEXT_ATTR, ADMIN)
async with get_sandbox_service(state) as sandbox_service:
sandbox_info = await sandbox_service.get_sandbox_by_session_api_key(
session_api_key
)
if sandbox_info is None:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED, detail='Invalid session API key'
)
if not sandbox_info.created_by_user_id:
if get_global_config().app_mode == AppMode.SAAS:
_logger.error(
'Sandbox had no user specified',
extra={'sandbox_id': sandbox_info.id},
)
raise HTTPException(
status.HTTP_401_UNAUTHORIZED,
detail='Sandbox had no user specified',
)
return sandbox_info

View File

@@ -126,10 +126,11 @@ class DbSessionInjector(BaseModel, Injector[async_sessionmaker]):
async def _create_async_gcp_creator(self):
from sqlalchemy.dialects.postgresql.asyncpg import (
AsyncAdapt_asyncpg_connection,
AsyncAdapt_asyncpg_dbapi,
)
return AsyncAdapt_asyncpg_connection(
asyncpg,
AsyncAdapt_asyncpg_dbapi(asyncpg),
await self._create_async_gcp_db_connection(),
prepared_statement_cache_size=100,
)
@@ -137,11 +138,14 @@ class DbSessionInjector(BaseModel, Injector[async_sessionmaker]):
async def _create_async_gcp_engine(self):
from sqlalchemy.dialects.postgresql.asyncpg import (
AsyncAdapt_asyncpg_connection,
AsyncAdapt_asyncpg_dbapi,
)
dbapi = AsyncAdapt_asyncpg_dbapi(asyncpg)
def adapted_creator():
return AsyncAdapt_asyncpg_connection(
asyncpg,
dbapi,
await_only(self._create_async_gcp_db_connection()),
prepared_statement_cache_size=100,
)

View File

@@ -48,8 +48,27 @@ class AuthUserContext(UserContext):
self._user_info = user_info
return user_info
async def get_provider_tokens(self) -> PROVIDER_TOKEN_TYPE | None:
return await self.user_auth.get_provider_tokens()
async def get_provider_tokens(
self, as_env_vars: bool = False
) -> PROVIDER_TOKEN_TYPE | dict[str, str] | None:
"""Return provider tokens.
Args:
as_env_vars: When True, return a ``dict[str, str]`` mapping env
var names (e.g. ``github_token``) to plain-text token values,
resolving the latest value at call time. When False (default),
return the raw ``dict[ProviderType, ProviderToken]``.
"""
provider_tokens = await self.user_auth.get_provider_tokens()
if not as_env_vars:
return provider_tokens
results: dict[str, str] = {}
if provider_tokens:
for provider_type, provider_token in provider_tokens.items():
if provider_token.token:
env_key = ProviderHandler.get_provider_env_key(provider_type)
results[env_key] = provider_token.token.get_secret_value()
return results
async def get_provider_handler(self):
provider_handler = self._provider_handler
@@ -79,9 +98,9 @@ class AuthUserContext(UserContext):
return token
async def get_secrets(self) -> dict[str, SecretSource]:
results = {}
results: dict[str, SecretSource] = {}
# Include custom secrets...
# Include custom secrets
secrets = await self.user_auth.get_secrets()
if secrets:
for name, custom_secret in secrets.custom_secrets.items():

View File

@@ -26,7 +26,9 @@ class SpecifyUserContext(UserContext):
) -> str:
raise NotImplementedError()
async def get_provider_tokens(self) -> PROVIDER_TOKEN_TYPE | None:
async def get_provider_tokens(
self, as_env_vars: bool = False
) -> PROVIDER_TOKEN_TYPE | dict[str, str] | None:
raise NotImplementedError()
async def get_latest_token(self, provider_type: ProviderType) -> str | None:

View File

@@ -35,8 +35,16 @@ class UserContext(ABC):
"""
@abstractmethod
async def get_provider_tokens(self) -> PROVIDER_TOKEN_TYPE | None:
"""Get the latest tokens for all provider types"""
async def get_provider_tokens(
self, as_env_vars: bool = False
) -> PROVIDER_TOKEN_TYPE | dict[str, str] | None:
"""Get the latest tokens for all provider types.
Args:
as_env_vars: When True, return a ``dict[str, str]`` mapping env
var names (e.g. ``github_token``) to plain-text token values.
When False (default), return the raw provider token mapping.
"""
@abstractmethod
async def get_latest_token(self, provider_type: ProviderType) -> str | None:

View File

@@ -1,12 +1,18 @@
"""User router for OpenHands App Server. For the moment, this simply implements the /me endpoint."""
from fastapi import APIRouter, HTTPException, status
import logging
from fastapi import APIRouter, Header, HTTPException, Query, status
from fastapi.responses import JSONResponse
from openhands.app_server.config import depends_user_context
from openhands.app_server.sandbox.session_auth import validate_session_key
from openhands.app_server.user.user_context import UserContext
from openhands.app_server.user.user_models import UserInfo
from openhands.server.dependencies import get_dependencies
_logger = logging.getLogger(__name__)
# We use the get_dependencies method here to signal to the OpenAPI docs that this endpoint
# is protected. The actual protection is provided by SetAuthCookieMiddleware
router = APIRouter(prefix='/users', tags=['User'], dependencies=get_dependencies())
@@ -18,9 +24,52 @@ user_dependency = depends_user_context()
@router.get('/me')
async def get_current_user(
user_context: UserContext = user_dependency,
expose_secrets: bool = Query(
default=False,
description='If true, return unmasked secret values (e.g. llm_api_key). '
'Requires a valid X-Session-API-Key header for an active sandbox '
'owned by the authenticated user.',
),
x_session_api_key: str | None = Header(default=None),
) -> UserInfo:
"""Get the current authenticated user."""
user = await user_context.get_user_info()
if user is None:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, detail='Not authenticated')
if expose_secrets:
await _validate_session_key_ownership(user_context, x_session_api_key)
return JSONResponse( # type: ignore[return-value]
content=user.model_dump(mode='json', context={'expose_secrets': True})
)
return user
async def _validate_session_key_ownership(
user_context: UserContext,
session_api_key: str | None,
) -> None:
"""Verify the session key belongs to a sandbox owned by the caller.
Raises ``HTTPException`` if the key is missing, invalid, or belongs
to a sandbox owned by a different user.
"""
sandbox_info = await validate_session_key(session_api_key)
# Verify the sandbox is owned by the authenticated user.
caller_id = await user_context.get_user_id()
if not caller_id:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED,
detail='Cannot determine authenticated user',
)
if sandbox_info.created_by_user_id != caller_id:
_logger.warning(
'Session key user mismatch: sandbox owner=%s, caller=%s',
sandbox_info.created_by_user_id,
caller_id,
)
raise HTTPException(
status.HTTP_403_FORBIDDEN,
detail='Session API key does not belong to the authenticated user',
)

View File

@@ -5,6 +5,9 @@ from openhands.app_server.event import event_router
from openhands.app_server.event_callback import (
webhook_router,
)
from openhands.app_server.pending_messages.pending_message_router import (
router as pending_message_router,
)
from openhands.app_server.sandbox import sandbox_router, sandbox_spec_router
from openhands.app_server.user import user_router
from openhands.app_server.web_client import web_client_router
@@ -13,6 +16,7 @@ from openhands.app_server.web_client import web_client_router
router = APIRouter(prefix='/api/v1')
router.include_router(event_router.router)
router.include_router(app_conversation_router.router)
router.include_router(pending_message_router)
router.include_router(sandbox_router.router)
router.include_router(sandbox_spec_router.router)
router.include_router(user_router.router)

View File

@@ -0,0 +1,10 @@
# OpenHands Architecture
Architecture diagrams and explanations for the OpenHands system.
## Documentation Sections
- [System Architecture Overview](./system-architecture.md) - Multi-tier architecture and component responsibilities
- [Conversation Startup & WebSocket Flow](./conversation-startup.md) - Runtime provisioning and real-time communication
- [Agent Execution & LLM Flow](./agent-execution.md) - LLM integration and action execution loop
- [Observability](./observability.md) - Logging, metrics, and monitoring

View File

@@ -0,0 +1,92 @@
# Agent Execution & LLM Flow
When the agent executes inside the sandbox, it makes LLM calls through LiteLLM:
```mermaid
sequenceDiagram
autonumber
participant User as User (Browser)
participant AS as Agent Server
participant Agent as Agent<br/>(CodeAct)
participant LLM as LLM Class
participant Lite as LiteLLM
participant Proxy as LLM Proxy<br/>(llm-proxy.app.all-hands.dev)
participant Provider as LLM Provider<br/>(OpenAI, Anthropic, etc.)
participant AES as Action Execution Server
Note over User,AES: Agent Loop - LLM Call Flow
User->>AS: WebSocket: User message
AS->>Agent: Process message
Note over Agent: Build prompt from state
Agent->>LLM: completion(messages, tools)
Note over LLM: Apply config (model, temp, etc.)
alt Using OpenHands Provider
LLM->>Lite: litellm_proxy/{model}
Lite->>Proxy: POST /chat/completions
Note over Proxy: Auth, rate limit, routing
Proxy->>Provider: Forward request
Provider-->>Proxy: Response
Proxy-->>Lite: Response
else Using Direct Provider
LLM->>Lite: {provider}/{model}
Lite->>Provider: Direct API call
Provider-->>Lite: Response
end
Lite-->>LLM: ModelResponse
Note over LLM: Track metrics (cost, tokens)
LLM-->>Agent: Parsed response
Note over Agent: Parse action from response
AS->>User: WebSocket: Action event
Note over User,AES: Action Execution
AS->>AES: HTTP: Execute action
Note over AES: Run command/edit file
AES-->>AS: Observation
AS->>User: WebSocket: Observation event
Note over Agent: Update state
Note over Agent: Loop continues...
```
### LLM Components
| Component | Purpose | Location |
|-----------|---------|----------|
| **LLM Class** | Wrapper with retries, metrics, config | `openhands/llm/llm.py` |
| **LiteLLM** | Universal LLM API adapter | External library |
| **LLM Proxy** | OpenHands managed proxy for billing/routing | `llm-proxy.app.all-hands.dev` |
| **LLM Registry** | Manages multiple LLM instances | `openhands/llm/llm_registry.py` |
### Model Routing
```
User selects model
┌───────────────────┐
│ Model prefix? │
└───────────────────┘
├── openhands/claude-3-5 ──► Rewrite to litellm_proxy/claude-3-5
│ Base URL: llm-proxy.app.all-hands.dev
├── anthropic/claude-3-5 ──► Direct to Anthropic API
│ (User's API key)
├── openai/gpt-4 ──► Direct to OpenAI API
│ (User's API key)
└── azure/gpt-4 ──► Direct to Azure OpenAI
(User's API key + endpoint)
```
### LLM Proxy
When using `openhands/` prefixed models, requests are routed through a managed proxy.
See the [OpenHands documentation](https://docs.openhands.dev/) for details on supported models.

View File

@@ -0,0 +1,68 @@
# Conversation Startup & WebSocket Flow
When a user starts a conversation, this sequence occurs:
```mermaid
sequenceDiagram
autonumber
participant User as User (Browser)
participant App as App Server
participant SS as Sandbox Service
participant RAPI as Runtime API
participant Pool as Warm Pool
participant Sandbox as Sandbox (Container)
participant AS as Agent Server
participant AES as Action Execution Server
Note over User,AES: Phase 1: Conversation Creation
User->>App: POST /api/conversations
Note over App: Authenticate user
App->>SS: Create sandbox
Note over SS,Pool: Phase 2: Runtime Provisioning
SS->>RAPI: POST /start (image, env, config)
RAPI->>Pool: Check for warm runtime
alt Warm runtime available
Pool-->>RAPI: Return warm runtime
Note over RAPI: Assign to session
else No warm runtime
RAPI->>Sandbox: Create new container
Sandbox->>AS: Start Agent Server
Sandbox->>AES: Start Action Execution Server
AES-->>AS: Ready
end
RAPI-->>SS: Runtime URL + session API key
SS-->>App: Sandbox info
App-->>User: Conversation ID + Sandbox URL
Note over User,AES: Phase 3: Direct WebSocket Connection
User->>AS: WebSocket: /sockets/events/{id}
AS-->>User: Connection accepted
AS->>User: Replay historical events
Note over User,AES: Phase 4: User Sends Message
User->>AS: WebSocket: SendMessageRequest
Note over AS: Agent processes message
Note over AS: LLM call → generate action
Note over User,AES: Phase 5: Action Execution Loop
loop Agent Loop
AS->>AES: HTTP: Execute action
Note over AES: Run in sandbox
AES-->>AS: Observation result
AS->>User: WebSocket: Event update
Note over AS: Update state, next action
end
Note over User,AES: Phase 6: Task Complete
AS->>User: WebSocket: AgentStateChanged (FINISHED)
```
### Key Points
1. **Initial Setup via App Server**: The App Server handles authentication and coordinates with the Sandbox Service
2. **Runtime API Provisioning**: The Sandbox Service calls the Runtime API, which checks for warm runtimes before creating new containers
3. **Warm Pool Optimization**: Pre-warmed runtimes reduce startup latency significantly
4. **Direct WebSocket to Sandbox**: Once created, the user's browser connects **directly** to the Agent Server inside the sandbox
5. **App Server Not in Hot Path**: After connection, all real-time communication bypasses the App Server entirely
6. **Agent Server Orchestrates**: The Agent Server manages the AI loop, calling the Action Execution Server for actual command execution

View File

@@ -0,0 +1,85 @@
# Observability
OpenHands provides structured logging and metrics collection for monitoring and debugging.
> **SDK Documentation**: For detailed guidance on observability and metrics in agent development, see:
> - [SDK Observability Guide](https://docs.openhands.dev/sdk/guides/observability)
> - [SDK Metrics Guide](https://docs.openhands.dev/sdk/guides/metrics)
```mermaid
flowchart LR
subgraph Sources["Sources"]
Agent["Agent Server"]
App["App Server"]
Frontend["Frontend"]
end
subgraph Collection["Collection"]
JSONLog["JSON Logs<br/>(stdout)"]
Metrics["Metrics<br/>(Internal)"]
end
subgraph External["External (Optional)"]
LogAgg["Log Aggregator"]
Analytics["Analytics Service"]
end
Agent --> JSONLog
App --> JSONLog
App --> Metrics
JSONLog --> LogAgg
Frontend --> Analytics
```
### Structured Logging
OpenHands uses Python's standard logging library with structured JSON output support.
| Component | Format | Destination | Purpose |
|-----------|--------|-------------|---------|
| **Application Logs** | JSON (when `LOG_JSON=1`) | stdout | Debugging, error tracking |
| **Access Logs** | JSON (Uvicorn) | stdout | Request tracing |
| **LLM Debug Logs** | Plain text | File (optional) | LLM call debugging |
### JSON Log Format
When `LOG_JSON=1` is set, logs are emitted as single-line JSON for ingestion by log aggregators:
```json
{
"message": "Conversation started",
"severity": "INFO",
"conversation_id": "abc-123",
"user_id": "user-456",
"timestamp": "2024-01-15T10:30:00Z"
}
```
Additional context can be added using Python's logger `extra=` parameter (see [Python logging docs](https://docs.python.org/3/library/logging.html)).
### Metrics
| Metric | Tracked By | Storage | Purpose |
|--------|------------|---------|---------|
| **LLM Cost** | `Metrics` class | Conversation stats file | Billing, budget limits |
| **Token Usage** | `Metrics` class | Conversation stats file | Usage analytics |
| **Response Latency** | `Metrics` class | Conversation stats file | Performance monitoring |
### Conversation Stats Persistence
Per-conversation metrics are persisted for analytics:
```python
# Location: openhands/server/services/conversation_stats.py
ConversationStats:
- service_to_metrics: Dict[str, Metrics]
- accumulated_cost: float
- token_usage: TokenUsage
# Stored at: {file_store}/conversation_stats/{conversation_id}.pkl
```
### Integration with External Services
Structured JSON logging allows integration with any log aggregation service (e.g., ELK Stack, Loki, Splunk). Configure your log collector to ingest from container stdout/stderr.

View File

@@ -0,0 +1,88 @@
# System Architecture Overview
OpenHands supports multiple deployment configurations. This document describes the core components and how they interact.
## Local/Docker Deployment
The simplest deployment runs everything locally or in Docker containers:
```mermaid
flowchart TB
subgraph Server["OpenHands Server"]
API["REST API<br/>(FastAPI)"]
ConvMgr["Conversation<br/>Manager"]
Runtime["Runtime<br/>Manager"]
end
subgraph Sandbox["Sandbox (Docker Container)"]
AES["Action Execution<br/>Server"]
Browser["Browser<br/>Environment"]
FS["File System"]
end
User["User"] -->|"HTTP/WebSocket"| API
API --> ConvMgr
ConvMgr --> Runtime
Runtime -->|"Provision"| Sandbox
Server -->|"Execute actions"| AES
AES --> Browser
AES --> FS
```
### Core Components
| Component | Purpose | Location |
|-----------|---------|----------|
| **Server** | REST API, conversation management, runtime orchestration | `openhands/server/` |
| **Runtime** | Abstract interface for sandbox execution | `openhands/runtime/` |
| **Action Execution Server** | Execute bash, file ops, browser actions | Inside sandbox |
| **EventStream** | Central event bus for all communication | `openhands/events/` |
## Scalable Deployment
For production deployments, OpenHands can be configured with a separate Runtime API service:
```mermaid
flowchart TB
subgraph AppServer["App Server"]
API["REST API"]
ConvMgr["Conversation<br/>Manager"]
end
subgraph RuntimeAPI["Runtime API (Optional)"]
RuntimeMgr["Runtime<br/>Manager"]
WarmPool["Warm Pool"]
end
subgraph Sandbox["Sandbox"]
AS["Agent Server"]
AES["Action Execution<br/>Server"]
end
User["User"] -->|"HTTP"| API
API --> ConvMgr
ConvMgr -->|"Provision"| RuntimeMgr
RuntimeMgr --> WarmPool
RuntimeMgr --> Sandbox
User -.->|"WebSocket"| AS
AS -->|"HTTP"| AES
```
This configuration enables:
- **Warm pool**: Pre-provisioned runtimes for faster startup
- **Direct WebSocket**: Users connect directly to their sandbox, bypassing the App Server
- **Horizontal scaling**: App Server and Runtime API can scale independently
### Runtime Options
OpenHands supports multiple runtime implementations:
| Runtime | Use Case |
|---------|----------|
| **DockerRuntime** | Local development, single-machine deployments |
| **RemoteRuntime** | Connect to externally managed sandboxes |
| **ModalRuntime** | Serverless execution via Modal |
See the [Runtime documentation](https://docs.openhands.dev/usage/architecture/runtime) for details.

View File

@@ -216,13 +216,18 @@ class BitbucketDCMixinBase(BaseGitService, HTTPClient):
)
async def _parse_repository(
self, repo: dict, link_header: str | None = None
self,
repo: dict,
link_header: str | None = None,
fetch_default_branch: bool = False,
) -> Repository:
"""Parse a Bitbucket data center API repository response into a Repository object.
Args:
repo: Repository data from Bitbucket data center API
link_header: Optional link header for pagination
fetch_default_branch: Whether to make an additional API call to fetch the
default branch. Set to False for listing endpoints to avoid N+1 queries.
Returns:
Repository object
@@ -240,14 +245,15 @@ class BitbucketDCMixinBase(BaseGitService, HTTPClient):
is_public = repo.get('public', False)
main_branch: str | None = None
try:
default_branch_url = (
f'{self._repo_api_base(project_key, repo_slug)}/default-branch'
)
default_branch_data, _ = await self._make_request(default_branch_url)
main_branch = default_branch_data.get('displayId') or None
except Exception as e:
logger.debug(f'Could not fetch default branch for {full_name}: {e}')
if fetch_default_branch:
try:
default_branch_url = (
f'{self._repo_api_base(project_key, repo_slug)}/default-branch'
)
default_branch_data, _ = await self._make_request(default_branch_url)
main_branch = default_branch_data.get('displayId') or None
except Exception as e:
logger.debug(f'Could not fetch default branch for {full_name}: {e}')
return Repository(
id=str(repo.get('id', '')),
@@ -275,7 +281,7 @@ class BitbucketDCMixinBase(BaseGitService, HTTPClient):
owner, repo = self._extract_owner_and_repo(repository)
url = self._repo_api_base(owner, repo)
data, _ = await self._make_request(url)
return await self._parse_repository(data)
return await self._parse_repository(data, fetch_default_branch=True)
async def _get_cursorrules_url(self, repository: str) -> str:
"""Get the URL for checking .cursorrules file."""

View File

@@ -471,7 +471,7 @@ class ProviderHandler:
def check_cmd_action_for_provider_token_ref(
cls, event: Action
) -> list[ProviderType]:
"""Detect if agent run action is using a provider token (e.g $GITHUB_TOKEN)
"""Detect if agent run action is using a provider token (e.g github_token)
Returns a list of providers which are called by the agent
"""
if not isinstance(event, CmdRunAction):

View File

@@ -29,3 +29,14 @@ For reference, here are the previous comments on the issue:
# Final Checklist
Re-read the issue title, body, and comments and make sure that you have successfully implemented all requirements.
Use the $GITHUB_TOKEN and GitHub APIs to
1. Create a new branch using `openhands/` as a prefix (e.g `openhands/update-readme`)
2. Commit your changes with a clear commit message
3. Push the branch to GitHub
4. Use the `create_pr` tool to open a new PR
5. The PR description should:
- Follow the repository's PR template (check `.github/pull_request_template.md` if it exists)
- Mention that it "fixes" or "closes" the issue number
- Include all required sections from the template

View File

@@ -2,8 +2,9 @@ Please send a final message summarizing your work.
If you simply answered a question, this final message should re-state the answer to the question.
If you made changes, please first double-check the git diff, think carefully about the user's request(s), and check:
If you made changes, please think carefully about the user's request(s) and summarize:
1. whether the request has been completely addressed and all of the instructions have been followed faithfully (in checklist format if appropriate).
2. whether the changes are concise (if there are any extraneous changes not important to addressing the user's request they should be reverted).
3. focus only on summarizing new changes since your last summary, avoiding repetition of information already covered in previous summaries.
If the request has been addressed and the changes are concise, then push your changes to the remote branch and send a final message summarizing the changes.
Please summarize in whatever language the user is using.

View File

@@ -115,6 +115,7 @@ REASONING_EFFORT_PATTERNS: list[str] = [
'o4-mini-2025-04-16',
'gemini-2.5-flash',
'gemini-2.5-pro',
'gemini-3.1-pro*',
'gpt-5*',
# DeepSeek reasoning family
'deepseek-r1-0528*',
@@ -139,6 +140,7 @@ PROMPT_CACHE_PATTERNS: list[str] = [
'claude-3-opus-20240229',
'claude-sonnet-4*',
'claude-opus-4*',
'gemini-3.1-pro*',
# Kimi series - verified via litellm config
'kimi-k2.5',
# GLM series - verified via litellm config

View File

@@ -8,7 +8,7 @@
from abc import ABC, abstractmethod
from typing import Any
from pydantic import BaseModel
from pydantic import BaseModel, field_validator
class ReviewThread(BaseModel):
@@ -21,7 +21,13 @@ class Issue(BaseModel):
repo: str
number: int
title: str
body: str
body: str = ''
@field_validator('body', mode='before')
@classmethod
def body_must_not_be_none(cls, v: str | None) -> str:
return v if v is not None else ''
thread_comments: list[str] | None = None # Added field for issue thread comments
closing_issues: list[str] | None = None
review_comments: list[str] | None = None

View File

@@ -115,10 +115,10 @@ def check_dependencies(code_repo_path: str, check_browser: bool) -> None:
session = server.new_session(session_name='test-session')
except Exception:
raise ValueError('tmux is not properly installed or available on the path.')
pane = session.attached_pane
pane = session.active_pane
pane.send_keys('echo "test"')
pane_output = '\n'.join(pane.cmd('capture-pane', '-p').stdout)
session.kill_session()
session.kill()
if 'test' not in pane_output:
raise ValueError('libtmux is not properly installed. ' + ERROR_MESSAGE)

View File

@@ -12,6 +12,7 @@ NOTE: Since this is run as a script, there should be no imports from project fil
import json
import os
import shlex
import subprocess
import sys
from pathlib import Path
@@ -81,6 +82,11 @@ def get_valid_ref(repo_dir: str) -> str | None:
return None
def _make_git_show_cmd(ref: str, repo_relative_path: str) -> str:
"""Return a git-show shell command with the ref:path argument safely quoted."""
return f'git show {shlex.quote(f"{ref}:{repo_relative_path}")}'
def get_git_diff(relative_file_path: str) -> dict[str, str]:
path = Path(os.getcwd(), relative_file_path).resolve()
if os.path.getsize(path) > MAX_FILE_SIZE_FOR_GIT_DIFF:
@@ -89,13 +95,17 @@ def get_git_diff(relative_file_path: str) -> dict[str, str]:
if not closest_git_repo:
raise ValueError('no_repository')
current_rev = get_valid_ref(str(closest_git_repo))
try:
original = run(
f'git show "{current_rev}:{path.relative_to(closest_git_repo)}"',
str(closest_git_repo),
)
except RuntimeError:
original = ''
original = ''
if current_rev is not None:
try:
original = run(
_make_git_show_cmd(
current_rev, str(path.relative_to(closest_git_repo))
),
str(closest_git_repo),
)
except RuntimeError:
pass
try:
with open(path, 'r') as f:
modified = '\n'.join(f.read().splitlines())

View File

@@ -6,6 +6,7 @@
# Unless you are working on deprecation, please avoid extending this legacy file and consult the V1 codepaths above.
# Tag: Legacy-V0
import json
import shlex
from dataclasses import dataclass
from pathlib import Path
from typing import Callable
@@ -14,9 +15,7 @@ from openhands.core.logger import openhands_logger as logger
from openhands.runtime.utils import git_changes, git_diff
GIT_CHANGES_CMD = 'python3 /openhands/code/openhands/runtime/utils/git_changes.py'
GIT_DIFF_CMD = (
'python3 /openhands/code/openhands/runtime/utils/git_diff.py "{file_path}"'
)
GIT_DIFF_CMD = 'python3 /openhands/code/openhands/runtime/utils/git_diff.py {file_path}'
GIT_BRANCH_CMD = 'git branch --show-current'
@@ -138,7 +137,9 @@ class GitHandler:
if not self.cwd:
raise ValueError('no_dir_in_git_diff')
result = self.execute(self.git_diff_cmd.format(file_path=file_path), self.cwd)
result = self.execute(
self.git_diff_cmd.format(file_path=shlex.quote(file_path)), self.cwd
)
if result.exit_code == 0:
diff = json.loads(result.content, strict=False)
return diff
@@ -150,7 +151,7 @@ class GitHandler:
# We try to add a script for getting git changes to the runtime - legacy runtimes may be missing the script
logger.info('GitHandler:get_git_diff: adding git_diff script to runtime...')
script_file = self._create_python_script_file(git_diff.__file__)
self.git_diff_cmd = f'python3 {script_file} "{{file_path}}"'
self.git_diff_cmd = f'python3 {script_file} {{file_path}}'
# Try again with the new changes cmd
return self.get_git_diff(file_path)

View File

@@ -45,7 +45,15 @@ RUN apt-get update && \
libasound2-plugins libatomic1 && \
(apt-get install -y --no-install-recommends libgl1 || apt-get install -y --no-install-recommends libgl1-mesa-glx) && \
# Install Docker dependencies
apt-get install -y --no-install-recommends apt-transport-https ca-certificates curl gnupg lsb-release
apt-get install -y --no-install-recommends apt-transport-https ca-certificates curl gnupg lsb-release && \
# Security upgrade: patch ImageMagick CVEs (CVE-2026-25897, CVE-2026-25968, CVE-2026-26284, et al.)
(apt-get install -y --no-install-recommends --only-upgrade \
imagemagick imagemagick-7-common imagemagick-7.q16 \
libmagickcore-7-arch-config libmagickcore-7-headers \
libmagickcore-7.q16-10 libmagickcore-7.q16-10-extra \
libmagickcore-7.q16-dev libmagickcore-dev \
libmagickwand-7-headers libmagickwand-7.q16-10 \
libmagickwand-7.q16-dev libmagickwand-dev || true)
{% endif %}
{% if (('ubuntu' in base_image) or ('mswebench' in base_image)) %}

View File

@@ -6,6 +6,7 @@
# Unless you are working on deprecation, please avoid extending this legacy file and consult the V1 codepaths above.
# Tag: Legacy-V0
import re
import time
import uuid
from typing import Any
@@ -71,15 +72,16 @@ class InvariantAnalyzer(SecurityAnalyzer):
else:
self.container = running_containers[0]
elapsed = 0
start_time = time.time()
while self.container.status != 'running':
self.container = self.docker_client.containers.get(self.container_name)
elapsed += 1
elapsed = time.time() - start_time
logger.debug(
f'waiting for container to start: {elapsed}, container status: {self.container.status}'
f'waiting for container to start: {elapsed:.1f}s, container status: {self.container.status}'
)
if elapsed > self.timeout:
break
time.sleep(0.5)
self.api_port = int(
self.container.attrs['NetworkSettings']['Ports']['8000/tcp'][0]['HostPort']

View File

@@ -41,7 +41,7 @@ from openhands.app_server.config import (
depends_httpx_client,
depends_sandbox_service,
)
from openhands.app_server.sandbox.sandbox_models import SandboxStatus
from openhands.app_server.sandbox.sandbox_models import AGENT_SERVER, SandboxStatus
from openhands.app_server.sandbox.sandbox_service import SandboxService
from openhands.app_server.services.db_session_injector import set_db_session_keep_open
from openhands.app_server.services.httpx_client_injector import (
@@ -614,7 +614,7 @@ async def _try_delete_v1_conversation(
# Delete the sandbox in the background
asyncio.create_task(
_delete_sandbox_and_close_connections(
_finalize_delete_and_close_connections(
sandbox_service,
app_conversation_info.sandbox_id,
db_session,
@@ -628,14 +628,18 @@ async def _try_delete_v1_conversation(
return result
async def _delete_sandbox_and_close_connections(
async def _finalize_delete_and_close_connections(
sandbox_service: SandboxService,
sandbox_id: str,
db_session: AsyncSession,
httpx_client: httpx.AsyncClient,
):
try:
await sandbox_service.delete_sandbox(sandbox_id)
num_conversations_in_sandbox = await _get_num_conversations_in_sandbox(
sandbox_service, sandbox_id, httpx_client
)
if num_conversations_in_sandbox == 0:
await sandbox_service.delete_sandbox(sandbox_id)
await db_session.commit()
finally:
await asyncio.gather(
@@ -646,6 +650,28 @@ async def _delete_sandbox_and_close_connections(
)
async def _get_num_conversations_in_sandbox(
sandbox_service: SandboxService,
sandbox_id: str,
httpx_client: httpx.AsyncClient,
) -> int:
try:
sandbox = await sandbox_service.get_sandbox(sandbox_id)
if not sandbox or not sandbox.exposed_urls:
return 0
agent_server_url = next(
u for u in sandbox.exposed_urls if u.name == AGENT_SERVER
)
response = await httpx_client.get(
f'{agent_server_url.url}/api/conversations/count',
headers={'X-Session-API-Key': sandbox.session_api_key},
)
result = int(response.content)
return result
except Exception:
return 0
async def _delete_v0_conversation(conversation_id: str, user_id: str | None) -> bool:
"""Delete a V0 conversation using the legacy logic."""
conversation_store = await ConversationStoreImpl.get_instance(config, user_id)

View File

@@ -113,24 +113,6 @@ async def load_settings(
)
@app.post(
'/reset-settings',
responses={
410: {
'description': 'Reset settings functionality has been removed',
'model': dict,
}
},
)
async def reset_settings() -> JSONResponse:
"""Resets user settings. (Deprecated)"""
logger.warning('Deprecated endpoint /api/reset-settings called by user')
return JSONResponse(
status_code=status.HTTP_410_GONE,
content={'error': 'Reset settings functionality has been removed.'},
)
async def store_llm_settings(
settings: Settings, existing_settings: Settings
) -> Settings:

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import re
from enum import Enum
from pathlib import PurePosixPath
from typing import Annotated
@@ -23,6 +24,20 @@ from openhands.storage.data_models.secrets import Secrets
MARKETPLACE_PATH_PATTERN = re.compile(r'^[A-Za-z0-9_-]+(?:/[A-Za-z0-9_.-]+)*\.json$')
class SandboxGroupingStrategy(str, Enum):
"""Strategy for grouping conversations within sandboxes."""
NO_GROUPING = 'NO_GROUPING' # Default - each conversation gets its own sandbox
GROUP_BY_NEWEST = 'GROUP_BY_NEWEST' # Add to the most recently created sandbox
LEAST_RECENTLY_USED = (
'LEAST_RECENTLY_USED' # Add to the least recently used sandbox
)
FEWEST_CONVERSATIONS = (
'FEWEST_CONVERSATIONS' # Add to sandbox with fewest conversations
)
ADD_TO_ANY = 'ADD_TO_ANY' # Add to any available sandbox (first found)
class Settings(BaseModel):
"""Persisted settings for OpenHands sessions."""
@@ -58,10 +73,10 @@ class Settings(BaseModel):
git_user_name: str | None = None
git_user_email: str | None = None
v1_enabled: bool = True
# Path to the marketplace JSON file for public skills loading
# None = Load all skills without marketplace filtering
# String value = Use that marketplace path (e.g., 'marketplaces/default.json')
marketplace_path: str | None = None
sandbox_grouping_strategy: SandboxGroupingStrategy = (
SandboxGroupingStrategy.NO_GROUPING
)
model_config = ConfigDict(
validate_assignment=True,

View File

@@ -1,5 +1,6 @@
import os
import shutil
import threading
from openhands.core.logger import openhands_logger as logger
from openhands.storage.files import FileStore
@@ -23,8 +24,20 @@ class LocalFileStore(FileStore):
full_path = self.get_full_path(path)
os.makedirs(os.path.dirname(full_path), exist_ok=True)
mode = 'w' if isinstance(contents, str) else 'wb'
with open(full_path, mode) as f:
f.write(contents)
# Use atomic write: write to temp file, then rename
# This prevents race conditions where concurrent writes could corrupt the file
temp_path = f'{full_path}.tmp.{os.getpid()}.{threading.get_ident()}'
try:
with open(temp_path, mode) as f:
f.write(contents)
f.flush()
os.fsync(f.fileno())
os.replace(temp_path, full_path)
except Exception:
if os.path.exists(temp_path):
os.remove(temp_path)
raise
def read(self, path: str) -> str:
full_path = self.get_full_path(path)

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import os
from enum import Enum
from functools import lru_cache
from pathlib import Path
@@ -9,6 +10,38 @@ _LEMONADE_PROVIDER_NAME = 'lemonade'
_LEMONADE_MODEL_PREFIX = 'lemonade/'
class StorageProvider(str, Enum):
"""Storage provider types for event and shared event storage."""
AWS = 'aws'
GCP = 'gcp'
FILESYSTEM = 'filesystem'
def get_storage_provider() -> StorageProvider:
"""Get the storage provider based on environment variables.
Determines the storage provider from environment configuration:
- SHARED_EVENT_STORAGE_PROVIDER: Primary setting, supports 'aws', 'gcp', 'google_cloud'
- FILE_STORE: Legacy fallback, supports 'google_cloud'
Returns:
StorageProvider: The configured storage provider (AWS, GCP, or FILESYSTEM)
"""
provider = os.environ.get('SHARED_EVENT_STORAGE_PROVIDER', '').lower()
# If not explicitly set, fall back to FILE_STORE
if not provider:
provider = os.environ.get('FILE_STORE', '').lower()
if provider == 'aws':
return StorageProvider.AWS
elif provider in ('gcp', 'google_cloud'):
return StorageProvider.GCP
else:
return StorageProvider.FILESYSTEM
@lru_cache(maxsize=1)
def is_running_in_docker() -> bool:
"""Best-effort detection for Docker containers."""

View File

@@ -22,6 +22,7 @@ OPENHANDS_MODELS = [
'openhands/gpt-5.2',
'openhands/minimax-m2.5',
'openhands/gemini-3-pro-preview',
'openhands/gemini-3.1-pro-preview',
'openhands/gemini-3-flash-preview',
'openhands/deepseek-chat',
'openhands/devstral-medium-2512',