mirror of
https://github.com/OpenHands/OpenHands.git
synced 2026-03-22 13:47:19 +08:00
V1 GitHub resolver fixes (#12199)
Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
@@ -14,6 +14,7 @@ from openhands.app_server.sandbox.sandbox_models import SandboxStatus
|
||||
from openhands.integrations.service_types import ProviderType
|
||||
from openhands.sdk.conversation.state import ConversationExecutionStatus
|
||||
from openhands.sdk.llm import MetricsSnapshot
|
||||
from openhands.sdk.utils.models import OpenHandsModel
|
||||
from openhands.storage.data_models.conversation_metadata import ConversationTrigger
|
||||
|
||||
|
||||
@@ -91,7 +92,7 @@ class AppConversationPage(BaseModel):
|
||||
next_page_id: str | None = None
|
||||
|
||||
|
||||
class AppConversationStartRequest(BaseModel):
|
||||
class AppConversationStartRequest(OpenHandsModel):
|
||||
"""Start conversation request object.
|
||||
|
||||
Although a user can go directly to the sandbox and start conversations, they
|
||||
@@ -142,7 +143,7 @@ class AppConversationStartTaskSortOrder(Enum):
|
||||
UPDATED_AT_DESC = 'UPDATED_AT_DESC'
|
||||
|
||||
|
||||
class AppConversationStartTask(BaseModel):
|
||||
class AppConversationStartTask(OpenHandsModel):
|
||||
"""Object describing the start process for an app conversation.
|
||||
|
||||
Because starting an app conversation can be slow (And can involve starting a sandbox),
|
||||
@@ -167,7 +168,7 @@ class AppConversationStartTask(BaseModel):
|
||||
updated_at: datetime = Field(default_factory=utc_now)
|
||||
|
||||
|
||||
class AppConversationStartTaskPage(BaseModel):
|
||||
class AppConversationStartTaskPage(OpenHandsModel):
|
||||
items: list[AppConversationStartTask]
|
||||
next_page_id: str | None = None
|
||||
|
||||
|
||||
@@ -135,7 +135,7 @@ class SQLAppConversationStartTaskService(AppConversationStartTaskService):
|
||||
if has_more:
|
||||
rows = rows[:limit]
|
||||
|
||||
items = [AppConversationStartTask(**row2dict(row)) for row in rows]
|
||||
items = [AppConversationStartTask.model_validate(row2dict(row)) for row in rows]
|
||||
|
||||
# Calculate next page ID
|
||||
next_page_id = None
|
||||
@@ -196,7 +196,7 @@ class SQLAppConversationStartTaskService(AppConversationStartTaskService):
|
||||
# Return tasks in the same order as requested, with None for missing ones
|
||||
return [
|
||||
(
|
||||
AppConversationStartTask(**row2dict(tasks_by_id[task_id]))
|
||||
AppConversationStartTask.model_validate(row2dict(tasks_by_id[task_id]))
|
||||
if task_id in tasks_by_id
|
||||
else None
|
||||
)
|
||||
@@ -218,7 +218,7 @@ class SQLAppConversationStartTaskService(AppConversationStartTaskService):
|
||||
result = await self.session.execute(query)
|
||||
stored_task = result.scalar_one_or_none()
|
||||
if stored_task:
|
||||
return AppConversationStartTask(**row2dict(stored_task))
|
||||
return AppConversationStartTask.model_validate(row2dict(stored_task))
|
||||
return None
|
||||
|
||||
async def save_app_conversation_start_task(
|
||||
|
||||
@@ -9,7 +9,6 @@ with the discriminated union system used by Pydantic for validation.
|
||||
|
||||
# Import base classes and processors without circular dependencies
|
||||
from .event_callback_models import EventCallbackProcessor, LoggingCallbackProcessor
|
||||
from .github_v1_callback_processor import GithubV1CallbackProcessor
|
||||
|
||||
# Note: SetTitleCallbackProcessor is not imported here to avoid circular imports
|
||||
# It will be registered when imported elsewhere in the application
|
||||
@@ -17,5 +16,4 @@ from .github_v1_callback_processor import GithubV1CallbackProcessor
|
||||
__all__ = [
|
||||
'EventCallbackProcessor',
|
||||
'LoggingCallbackProcessor',
|
||||
'GithubV1CallbackProcessor',
|
||||
]
|
||||
|
||||
@@ -1,295 +0,0 @@
|
||||
import logging
|
||||
import os
|
||||
from typing import Any
|
||||
from uuid import UUID
|
||||
|
||||
import httpx
|
||||
from github import Auth, Github, GithubIntegration
|
||||
from pydantic import Field
|
||||
|
||||
from openhands.agent_server.models import AskAgentRequest, AskAgentResponse
|
||||
from openhands.app_server.event_callback.event_callback_models import (
|
||||
EventCallback,
|
||||
EventCallbackProcessor,
|
||||
)
|
||||
from openhands.app_server.event_callback.event_callback_result_models import (
|
||||
EventCallbackResult,
|
||||
EventCallbackResultStatus,
|
||||
)
|
||||
from openhands.app_server.event_callback.util import (
|
||||
ensure_conversation_found,
|
||||
ensure_running_sandbox,
|
||||
get_agent_server_url_from_sandbox,
|
||||
get_conversation_url,
|
||||
get_prompt_template,
|
||||
)
|
||||
from openhands.sdk import Event
|
||||
from openhands.sdk.event import ConversationStateUpdateEvent
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GithubV1CallbackProcessor(EventCallbackProcessor):
|
||||
"""Callback processor for GitHub V1 integrations."""
|
||||
|
||||
github_view_data: dict[str, Any] = Field(default_factory=dict)
|
||||
should_request_summary: bool = Field(default=True)
|
||||
should_extract: bool = Field(default=True)
|
||||
inline_pr_comment: bool = Field(default=False)
|
||||
|
||||
async def __call__(
|
||||
self,
|
||||
conversation_id: UUID,
|
||||
callback: EventCallback,
|
||||
event: Event,
|
||||
) -> EventCallbackResult | None:
|
||||
"""Process events for GitHub V1 integration."""
|
||||
|
||||
# Only handle ConversationStateUpdateEvent
|
||||
if not isinstance(event, ConversationStateUpdateEvent):
|
||||
return None
|
||||
|
||||
# Only act when execution has finished
|
||||
if not (event.key == 'execution_status' and event.value == 'finished'):
|
||||
return None
|
||||
|
||||
_logger.info('[GitHub V1] Callback agent state was %s', event)
|
||||
_logger.info(
|
||||
'[GitHub V1] Should request summary: %s', self.should_request_summary
|
||||
)
|
||||
|
||||
if not self.should_request_summary:
|
||||
return None
|
||||
|
||||
self.should_request_summary = False
|
||||
|
||||
try:
|
||||
summary = await self._request_summary(conversation_id)
|
||||
await self._post_summary_to_github(summary)
|
||||
|
||||
return EventCallbackResult(
|
||||
status=EventCallbackResultStatus.SUCCESS,
|
||||
event_callback_id=callback.id,
|
||||
event_id=event.id,
|
||||
conversation_id=conversation_id,
|
||||
detail=summary,
|
||||
)
|
||||
except Exception as e:
|
||||
_logger.exception('[GitHub V1] Error processing callback: %s', e)
|
||||
|
||||
# Only try to post error to GitHub if we have basic requirements
|
||||
try:
|
||||
# Check if we have installation ID and credentials before posting
|
||||
if (
|
||||
self.github_view_data.get('installation_id')
|
||||
and os.getenv('GITHUB_APP_CLIENT_ID')
|
||||
and os.getenv('GITHUB_APP_PRIVATE_KEY')
|
||||
):
|
||||
await self._post_summary_to_github(
|
||||
f'OpenHands encountered an error: **{str(e)}**.\n\n'
|
||||
f'[See the conversation]({get_conversation_url().format(conversation_id)})'
|
||||
'for more information.'
|
||||
)
|
||||
except Exception as post_error:
|
||||
_logger.warning(
|
||||
'[GitHub V1] Failed to post error message to GitHub: %s', post_error
|
||||
)
|
||||
|
||||
return EventCallbackResult(
|
||||
status=EventCallbackResultStatus.ERROR,
|
||||
event_callback_id=callback.id,
|
||||
event_id=event.id,
|
||||
conversation_id=conversation_id,
|
||||
detail=str(e),
|
||||
)
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# GitHub helpers
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
def _get_installation_access_token(self) -> str:
|
||||
installation_id = self.github_view_data.get('installation_id')
|
||||
|
||||
if not installation_id:
|
||||
raise ValueError(
|
||||
f'Missing installation ID for GitHub payload: {self.github_view_data}'
|
||||
)
|
||||
|
||||
github_app_client_id = os.getenv('GITHUB_APP_CLIENT_ID', '').strip()
|
||||
github_app_private_key = os.getenv('GITHUB_APP_PRIVATE_KEY', '').replace(
|
||||
'\\n', '\n'
|
||||
)
|
||||
|
||||
if not github_app_client_id or not github_app_private_key:
|
||||
raise ValueError('GitHub App credentials are not configured')
|
||||
|
||||
github_integration = GithubIntegration(
|
||||
auth=Auth.AppAuth(github_app_client_id, github_app_private_key),
|
||||
)
|
||||
token_data = github_integration.get_access_token(installation_id)
|
||||
return token_data.token
|
||||
|
||||
async def _post_summary_to_github(self, summary: str) -> None:
|
||||
"""Post a summary comment to the configured GitHub issue."""
|
||||
installation_token = self._get_installation_access_token()
|
||||
|
||||
if not installation_token:
|
||||
raise RuntimeError('Missing GitHub credentials')
|
||||
|
||||
full_repo_name = self.github_view_data['full_repo_name']
|
||||
issue_number = self.github_view_data['issue_number']
|
||||
|
||||
if self.inline_pr_comment:
|
||||
with Github(installation_token) as github_client:
|
||||
repo = github_client.get_repo(full_repo_name)
|
||||
pr = repo.get_pull(issue_number)
|
||||
pr.create_review_comment_reply(
|
||||
comment_id=self.github_view_data.get('comment_id', ''), body=summary
|
||||
)
|
||||
return
|
||||
|
||||
with Github(installation_token) as github_client:
|
||||
repo = github_client.get_repo(full_repo_name)
|
||||
issue = repo.get_issue(number=issue_number)
|
||||
issue.create_comment(summary)
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Agent / sandbox helpers
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
async def _ask_question(
|
||||
self,
|
||||
httpx_client: httpx.AsyncClient,
|
||||
agent_server_url: str,
|
||||
conversation_id: UUID,
|
||||
session_api_key: str,
|
||||
message_content: str,
|
||||
) -> str:
|
||||
"""Send a message to the agent server via the V1 API and return response text."""
|
||||
send_message_request = AskAgentRequest(question=message_content)
|
||||
|
||||
url = (
|
||||
f'{agent_server_url.rstrip("/")}'
|
||||
f'/api/conversations/{conversation_id}/ask_agent'
|
||||
)
|
||||
headers = {'X-Session-API-Key': session_api_key}
|
||||
payload = send_message_request.model_dump()
|
||||
|
||||
try:
|
||||
response = await httpx_client.post(
|
||||
url,
|
||||
json=payload,
|
||||
headers=headers,
|
||||
timeout=30.0,
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
agent_response = AskAgentResponse.model_validate(response.json())
|
||||
return agent_response.response
|
||||
|
||||
except httpx.HTTPStatusError as e:
|
||||
error_detail = f'HTTP {e.response.status_code} error'
|
||||
try:
|
||||
error_body = e.response.text
|
||||
if error_body:
|
||||
error_detail += f': {error_body}'
|
||||
except Exception: # noqa: BLE001
|
||||
pass
|
||||
|
||||
_logger.error(
|
||||
'[GitHub V1] HTTP error sending message to %s: %s. '
|
||||
'Request payload: %s. Response headers: %s',
|
||||
url,
|
||||
error_detail,
|
||||
payload,
|
||||
dict(e.response.headers),
|
||||
exc_info=True,
|
||||
)
|
||||
raise Exception(f'Failed to send message to agent server: {error_detail}')
|
||||
|
||||
except httpx.TimeoutException:
|
||||
error_detail = f'Request timeout after 30 seconds to {url}'
|
||||
_logger.error(
|
||||
'[GitHub V1] %s. Request payload: %s',
|
||||
error_detail,
|
||||
payload,
|
||||
exc_info=True,
|
||||
)
|
||||
raise Exception(error_detail)
|
||||
|
||||
except httpx.RequestError as e:
|
||||
error_detail = f'Request error to {url}: {str(e)}'
|
||||
_logger.error(
|
||||
'[GitHub V1] %s. Request payload: %s',
|
||||
error_detail,
|
||||
payload,
|
||||
exc_info=True,
|
||||
)
|
||||
raise Exception(error_detail)
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Summary orchestration
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
async def _request_summary(self, conversation_id: UUID) -> str:
|
||||
"""
|
||||
Ask the agent to produce a summary of its work and return the agent response.
|
||||
|
||||
NOTE: This method now returns a string (the agent server's response text)
|
||||
and raises exceptions on errors. The wrapping into EventCallbackResult
|
||||
is handled by __call__.
|
||||
"""
|
||||
# Import services within the method to avoid circular imports
|
||||
from openhands.app_server.config import (
|
||||
get_app_conversation_info_service,
|
||||
get_httpx_client,
|
||||
get_sandbox_service,
|
||||
)
|
||||
from openhands.app_server.services.injector import InjectorState
|
||||
from openhands.app_server.user.specifiy_user_context import (
|
||||
ADMIN,
|
||||
USER_CONTEXT_ATTR,
|
||||
)
|
||||
|
||||
# Create injector state for dependency injection
|
||||
state = InjectorState()
|
||||
setattr(state, USER_CONTEXT_ATTR, ADMIN)
|
||||
|
||||
async with (
|
||||
get_app_conversation_info_service(state) as app_conversation_info_service,
|
||||
get_sandbox_service(state) as sandbox_service,
|
||||
get_httpx_client(state) as httpx_client,
|
||||
):
|
||||
# 1. Conversation lookup
|
||||
app_conversation_info = ensure_conversation_found(
|
||||
await app_conversation_info_service.get_app_conversation_info(
|
||||
conversation_id
|
||||
),
|
||||
conversation_id,
|
||||
)
|
||||
|
||||
# 2. Sandbox lookup + validation
|
||||
sandbox = ensure_running_sandbox(
|
||||
await sandbox_service.get_sandbox(app_conversation_info.sandbox_id),
|
||||
app_conversation_info.sandbox_id,
|
||||
)
|
||||
|
||||
assert sandbox.session_api_key is not None, (
|
||||
f'No session API key for sandbox: {sandbox.id}'
|
||||
)
|
||||
|
||||
# 3. URL + instruction
|
||||
agent_server_url = get_agent_server_url_from_sandbox(sandbox)
|
||||
agent_server_url = get_agent_server_url_from_sandbox(sandbox)
|
||||
|
||||
# Prepare message based on agent state
|
||||
message_content = get_prompt_template('summary_prompt.j2')
|
||||
|
||||
# Ask the agent and return the response text
|
||||
return await self._ask_question(
|
||||
httpx_client=httpx_client,
|
||||
agent_server_url=agent_server_url,
|
||||
conversation_id=conversation_id,
|
||||
session_api_key=sandbox.session_api_key,
|
||||
message_content=message_content,
|
||||
)
|
||||
@@ -90,7 +90,7 @@ class SQLEventCallbackService(EventCallbackService):
|
||||
self.db_session.add(stored_callback)
|
||||
await self.db_session.commit()
|
||||
await self.db_session.refresh(stored_callback)
|
||||
return EventCallback(**row2dict(stored_callback))
|
||||
return EventCallback.model_validate(row2dict(stored_callback))
|
||||
|
||||
async def get_event_callback(self, id: UUID) -> EventCallback | None:
|
||||
"""Get a single event callback, returning None if not found."""
|
||||
@@ -98,7 +98,7 @@ class SQLEventCallbackService(EventCallbackService):
|
||||
result = await self.db_session.execute(stmt)
|
||||
stored_callback = result.scalar_one_or_none()
|
||||
if stored_callback:
|
||||
return EventCallback(**row2dict(stored_callback))
|
||||
return EventCallback.model_validate(row2dict(stored_callback))
|
||||
return None
|
||||
|
||||
async def delete_event_callback(self, id: UUID) -> bool:
|
||||
@@ -173,7 +173,9 @@ class SQLEventCallbackService(EventCallbackService):
|
||||
next_page_id = str(offset + limit)
|
||||
|
||||
# Convert stored callbacks to domain models
|
||||
callbacks = [EventCallback(**row2dict(cb)) for cb in stored_callbacks]
|
||||
callbacks = [
|
||||
EventCallback.model_validate(row2dict(cb)) for cb in stored_callbacks
|
||||
]
|
||||
return EventCallbackPage(items=callbacks, next_page_id=next_page_id)
|
||||
|
||||
async def save_event_callback(self, event_callback: EventCallback) -> EventCallback:
|
||||
@@ -202,7 +204,9 @@ class SQLEventCallbackService(EventCallbackService):
|
||||
result = await self.db_session.execute(query)
|
||||
stored_callbacks = result.scalars().all()
|
||||
if stored_callbacks:
|
||||
callbacks = [EventCallback(**row2dict(cb)) for cb in stored_callbacks]
|
||||
callbacks = [
|
||||
EventCallback.model_validate(row2dict(cb)) for cb in stored_callbacks
|
||||
]
|
||||
await asyncio.gather(
|
||||
*[
|
||||
self.execute_callback(conversation_id, callback, event)
|
||||
|
||||
@@ -18,15 +18,6 @@ if TYPE_CHECKING:
|
||||
)
|
||||
|
||||
|
||||
def get_conversation_url() -> str:
|
||||
from openhands.app_server.config import get_global_config
|
||||
|
||||
web_url = get_global_config().web_url
|
||||
conversation_prefix = 'conversations/{}'
|
||||
conversation_url = f'{web_url}/{conversation_prefix}'
|
||||
return conversation_url
|
||||
|
||||
|
||||
def ensure_conversation_found(
|
||||
app_conversation_info: AppConversationInfo | None, conversation_id: UUID
|
||||
) -> AppConversationInfo:
|
||||
@@ -68,14 +59,3 @@ def get_agent_server_url_from_sandbox(sandbox: SandboxInfo) -> str:
|
||||
) from None
|
||||
|
||||
return replace_localhost_hostname_for_docker(agent_server_url)
|
||||
|
||||
|
||||
def get_prompt_template(template_name: str) -> str:
|
||||
from jinja2 import Environment, FileSystemLoader
|
||||
|
||||
jinja_env = Environment(
|
||||
loader=FileSystemLoader('openhands/integrations/templates/resolver/')
|
||||
)
|
||||
summary_instruction_template = jinja_env.get_template(template_name)
|
||||
summary_instruction = summary_instruction_template.render()
|
||||
return summary_instruction
|
||||
|
||||
@@ -790,7 +790,7 @@ class RemoteSandboxServiceInjector(SandboxServiceInjector):
|
||||
# This is primarily used for local development rather than production
|
||||
config = get_global_config()
|
||||
web_url = config.web_url
|
||||
if web_url is None:
|
||||
if web_url is None or 'localhost' in web_url:
|
||||
global polling_task
|
||||
if polling_task is None:
|
||||
polling_task = asyncio.create_task(
|
||||
|
||||
Reference in New Issue
Block a user