mirror of
https://github.com/OpenHands/OpenHands.git
synced 2025-12-26 05:48:36 +08:00
Co-authored-by: openhands <openhands@all-hands.dev> Co-authored-by: Engel Nyst <enyst@users.noreply.github.com>
224 lines
9.7 KiB
Python
224 lines
9.7 KiB
Python
import base64
|
|
import datetime
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from browsergym.utils.obs import flatten_axtree_to_str
|
|
from PIL import Image
|
|
|
|
from openhands.core.exceptions import BrowserUnavailableException
|
|
from openhands.core.schema import ActionType
|
|
from openhands.events.action import BrowseInteractiveAction, BrowseURLAction
|
|
from openhands.events.observation import BrowserOutputObservation
|
|
from openhands.runtime.browser.base64 import png_base64_url_to_image
|
|
from openhands.runtime.browser.browser_env import BrowserEnv
|
|
from openhands.utils.async_utils import call_sync_from_async
|
|
|
|
|
|
def get_axtree_str(
|
|
axtree_object: dict[str, Any],
|
|
extra_element_properties: dict[str, Any],
|
|
filter_visible_only: bool = False,
|
|
) -> str:
|
|
cur_axtree_txt = flatten_axtree_to_str(
|
|
axtree_object,
|
|
extra_properties=extra_element_properties,
|
|
with_clickable=True,
|
|
skip_generic=False,
|
|
filter_visible_only=filter_visible_only,
|
|
)
|
|
return str(cur_axtree_txt)
|
|
|
|
|
|
def get_agent_obs_text(obs: BrowserOutputObservation) -> str:
|
|
"""Get a concise text that will be shown to the agent."""
|
|
if obs.trigger_by_action == ActionType.BROWSE_INTERACTIVE:
|
|
text = f'[Current URL: {obs.url}]\n'
|
|
text += f'[Focused element bid: {obs.focused_element_bid}]\n'
|
|
|
|
# Add screenshot path information if available
|
|
if obs.screenshot_path:
|
|
text += f'[Screenshot saved to: {obs.screenshot_path}]\n'
|
|
|
|
text += '\n'
|
|
|
|
if obs.error:
|
|
text += (
|
|
'================ BEGIN error message ===============\n'
|
|
'The following error occurred when executing the last action:\n'
|
|
f'{obs.last_browser_action_error}\n'
|
|
'================ END error message ===============\n'
|
|
)
|
|
else:
|
|
text += '[Action executed successfully.]\n'
|
|
try:
|
|
# We do not filter visible only here because we want to show the full content
|
|
# of the web page to the agent for simplicity.
|
|
# FIXME: handle the case when the web page is too large
|
|
cur_axtree_txt = get_axtree_str(
|
|
obs.axtree_object,
|
|
obs.extra_element_properties,
|
|
filter_visible_only=obs.filter_visible_only,
|
|
)
|
|
if not obs.filter_visible_only:
|
|
text += (
|
|
f'Accessibility tree of the COMPLETE webpage:\nNote: [bid] is the unique alpha-numeric identifier at the beginning of lines for each element in the AXTree. Always use bid to refer to elements in your actions.\n'
|
|
f'============== BEGIN accessibility tree ==============\n'
|
|
f'{cur_axtree_txt}\n'
|
|
f'============== END accessibility tree ==============\n'
|
|
)
|
|
else:
|
|
text += (
|
|
f'Accessibility tree of the VISIBLE portion of the webpage (accessibility tree of complete webpage is too large and you may need to scroll to view remaining portion of the webpage):\nNote: [bid] is the unique alpha-numeric identifier at the beginning of lines for each element in the AXTree. Always use bid to refer to elements in your actions.\n'
|
|
f'============== BEGIN accessibility tree ==============\n'
|
|
f'{cur_axtree_txt}\n'
|
|
f'============== END accessibility tree ==============\n'
|
|
)
|
|
except Exception as e:
|
|
text += f'\n[Error encountered when processing the accessibility tree: {e}]'
|
|
return text
|
|
|
|
elif obs.trigger_by_action == ActionType.BROWSE:
|
|
text = f'[Current URL: {obs.url}]\n'
|
|
|
|
if obs.error:
|
|
text += (
|
|
'================ BEGIN error message ===============\n'
|
|
'The following error occurred when trying to visit the URL:\n'
|
|
f'{obs.last_browser_action_error}\n'
|
|
'================ END error message ===============\n'
|
|
)
|
|
text += '============== BEGIN webpage content ==============\n'
|
|
text += obs.content
|
|
text += '\n============== END webpage content ==============\n'
|
|
return text
|
|
else:
|
|
raise ValueError(f'Invalid trigger_by_action: {obs.trigger_by_action}')
|
|
|
|
|
|
async def browse(
|
|
action: BrowseURLAction | BrowseInteractiveAction,
|
|
browser: BrowserEnv | None,
|
|
workspace_dir: str | None = None,
|
|
) -> BrowserOutputObservation:
|
|
if browser is None:
|
|
raise BrowserUnavailableException()
|
|
|
|
if isinstance(action, BrowseURLAction):
|
|
# legacy BrowseURLAction
|
|
asked_url = action.url
|
|
if not asked_url.startswith('http'):
|
|
asked_url = os.path.abspath(os.curdir) + action.url
|
|
action_str = f'goto("{asked_url}")'
|
|
|
|
elif isinstance(action, BrowseInteractiveAction):
|
|
# new BrowseInteractiveAction, supports full featured BrowserGym actions
|
|
# action in BrowserGym: see https://github.com/ServiceNow/BrowserGym/blob/main/core/src/browsergym/core/action/functions.py
|
|
action_str = action.browser_actions
|
|
else:
|
|
raise ValueError(f'Invalid action type: {action.action}')
|
|
|
|
try:
|
|
# obs provided by BrowserGym: see https://github.com/ServiceNow/BrowserGym/blob/main/core/src/browsergym/core/env.py#L396
|
|
obs = await call_sync_from_async(browser.step, action_str)
|
|
|
|
# Save screenshot if workspace_dir is provided
|
|
screenshot_path = None
|
|
if workspace_dir is not None and obs.get('screenshot'):
|
|
# Create screenshots directory if it doesn't exist
|
|
screenshots_dir = Path(workspace_dir) / '.browser_screenshots'
|
|
screenshots_dir.mkdir(exist_ok=True)
|
|
|
|
# Generate a filename based on timestamp
|
|
timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S_%f')
|
|
screenshot_filename = f'screenshot_{timestamp}.png'
|
|
screenshot_path = str(screenshots_dir / screenshot_filename)
|
|
|
|
# Direct image saving from base64 data without using PIL's Image.open
|
|
# This approach bypasses potential encoding issues that might occur when
|
|
# converting between different image representations, ensuring the raw PNG
|
|
# data from the browser is saved directly to disk.
|
|
|
|
# Extract the base64 data
|
|
base64_data = obs.get('screenshot', '')
|
|
if ',' in base64_data:
|
|
base64_data = base64_data.split(',')[1]
|
|
|
|
try:
|
|
# Decode base64 directly to binary
|
|
image_data = base64.b64decode(base64_data)
|
|
|
|
# Write binary data directly to file
|
|
with open(screenshot_path, 'wb') as f:
|
|
f.write(image_data)
|
|
|
|
# Verify the image was saved correctly by opening it
|
|
# This is just a verification step and can be removed in production
|
|
Image.open(screenshot_path).verify()
|
|
except Exception:
|
|
# If direct saving fails, fall back to the original method
|
|
image = png_base64_url_to_image(obs.get('screenshot'))
|
|
image.save(screenshot_path, format='PNG', optimize=True)
|
|
|
|
# Create the observation with all data
|
|
observation = BrowserOutputObservation(
|
|
content=obs['text_content'], # text content of the page
|
|
url=obs.get('url', ''), # URL of the page
|
|
screenshot=obs.get('screenshot', None), # base64-encoded screenshot, png
|
|
screenshot_path=screenshot_path, # path to saved screenshot file
|
|
set_of_marks=obs.get(
|
|
'set_of_marks', None
|
|
), # base64-encoded Set-of-Marks annotated screenshot, png,
|
|
goal_image_urls=obs.get('image_content', []),
|
|
open_pages_urls=obs.get('open_pages_urls', []), # list of open pages
|
|
active_page_index=obs.get(
|
|
'active_page_index', -1
|
|
), # index of the active page
|
|
axtree_object=obs.get('axtree_object', {}), # accessibility tree object
|
|
extra_element_properties=obs.get('extra_element_properties', {}),
|
|
focused_element_bid=obs.get(
|
|
'focused_element_bid', None
|
|
), # focused element bid
|
|
last_browser_action=obs.get(
|
|
'last_action', ''
|
|
), # last browser env action performed
|
|
last_browser_action_error=obs.get('last_action_error', ''),
|
|
error=True if obs.get('last_action_error', '') else False, # error flag
|
|
trigger_by_action=action.action,
|
|
)
|
|
|
|
# Process the content first using the axtree_object
|
|
observation.content = get_agent_obs_text(observation)
|
|
|
|
# If return_axtree is False, remove the axtree_object to save space
|
|
if not action.return_axtree:
|
|
observation.dom_object = {}
|
|
observation.axtree_object = {}
|
|
observation.extra_element_properties = {}
|
|
|
|
return observation
|
|
except Exception as e:
|
|
error_message = str(e)
|
|
error_url = asked_url if action.action == ActionType.BROWSE else ''
|
|
|
|
# Create error observation
|
|
observation = BrowserOutputObservation(
|
|
content=error_message,
|
|
screenshot='',
|
|
screenshot_path=None,
|
|
error=True,
|
|
last_browser_action_error=error_message,
|
|
url=error_url,
|
|
trigger_by_action=action.action,
|
|
)
|
|
|
|
# Process the content using get_agent_obs_text regardless of return_axtree value
|
|
try:
|
|
observation.content = get_agent_obs_text(observation)
|
|
except Exception:
|
|
# If get_agent_obs_text fails, keep the original error message
|
|
pass
|
|
|
|
return observation
|