# -*- coding: utf-8 -*- # @Time : 2025/1/1 # @Author : wenshao # @Email : wenshaoguo1026@gmail.com # @Project : browser-use-webui # @FileName: webui.py import pdb import logging from dotenv import load_dotenv load_dotenv() import os import glob import asyncio import argparse import os logger = logging.getLogger(__name__) import gradio as gr from browser_use.agent.service import Agent from playwright.async_api import async_playwright from browser_use.browser.browser import Browser, BrowserConfig from browser_use.browser.context import ( BrowserContextConfig, BrowserContextWindowSize, ) from playwright.async_api import async_playwright from src.utils.agent_state import AgentState from src.utils import utils from src.agent.custom_agent import CustomAgent from src.browser.custom_browser import CustomBrowser from src.agent.custom_prompts import CustomSystemPrompt from src.browser.config import BrowserPersistenceConfig from src.browser.custom_context import BrowserContextConfig, CustomBrowserContext from src.controller.custom_controller import CustomController from gradio.themes import Citrus, Default, Glass, Monochrome, Ocean, Origin, Soft, Base from src.utils.utils import update_model_dropdown, get_latest_files, capture_screenshot from dotenv import load_dotenv load_dotenv() # Global variables for persistence _global_browser = None _global_browser_context = None # Create the global agent state instance _global_agent_state = AgentState() async def stop_agent(): """Request the agent to stop and update UI with enhanced feedback""" global _global_agent_state, _global_browser_context, _global_browser try: # Request stop _global_agent_state.request_stop() # Update UI immediately message = "Stop requested - the agent will halt at the next safe point" logger.info(f"đ {message}") # Return UI updates return ( message, # errors_output gr.update(value="Stopping...", interactive=False), # stop_button gr.update(interactive=False), # run_button ) except Exception as e: error_msg = f"Error during stop: {str(e)}" logger.error(error_msg) return ( error_msg, gr.update(value="Stop", interactive=True), gr.update(interactive=True) ) async def run_browser_agent( agent_type, llm_provider, llm_model_name, llm_temperature, llm_base_url, llm_api_key, use_own_browser, keep_browser_open, headless, disable_security, window_w, window_h, save_recording_path, save_agent_history_path, save_trace_path, enable_recording, task, add_infos, max_steps, use_vision, max_actions_per_step, tool_call_in_content ): global _global_agent_state _global_agent_state.clear_stop() # Clear any previous stop requests try: # Disable recording if the checkbox is unchecked if not enable_recording: save_recording_path = None # Ensure the recording directory exists if recording is enabled if save_recording_path: os.makedirs(save_recording_path, exist_ok=True) # Get the list of existing videos before the agent runs existing_videos = set() if save_recording_path: existing_videos = set( glob.glob(os.path.join(save_recording_path, "*.[mM][pP]4")) + glob.glob(os.path.join(save_recording_path, "*.[wW][eE][bB][mM]")) ) # Run the agent llm = utils.get_llm_model( provider=llm_provider, model_name=llm_model_name, temperature=llm_temperature, base_url=llm_base_url, api_key=llm_api_key, ) if agent_type == "org": final_result, errors, model_actions, model_thoughts, trace_file, history_file = await run_org_agent( llm=llm, use_own_browser=use_own_browser, keep_browser_open=keep_browser_open, headless=headless, disable_security=disable_security, window_w=window_w, window_h=window_h, save_recording_path=save_recording_path, save_agent_history_path=save_agent_history_path, save_trace_path=save_trace_path, task=task, max_steps=max_steps, use_vision=use_vision, max_actions_per_step=max_actions_per_step, tool_call_in_content=tool_call_in_content ) elif agent_type == "custom": final_result, errors, model_actions, model_thoughts, trace_file, history_file = await run_custom_agent( llm=llm, use_own_browser=use_own_browser, keep_browser_open=keep_browser_open, headless=headless, disable_security=disable_security, window_w=window_w, window_h=window_h, save_recording_path=save_recording_path, save_agent_history_path=save_agent_history_path, save_trace_path=save_trace_path, task=task, add_infos=add_infos, max_steps=max_steps, use_vision=use_vision, max_actions_per_step=max_actions_per_step, tool_call_in_content=tool_call_in_content ) else: raise ValueError(f"Invalid agent type: {agent_type}") # Get the list of videos after the agent runs (if recording is enabled) latest_video = None if save_recording_path: new_videos = set( glob.glob(os.path.join(save_recording_path, "*.[mM][pP]4")) + glob.glob(os.path.join(save_recording_path, "*.[wW][eE][bB][mM]")) ) if new_videos - existing_videos: latest_video = list(new_videos - existing_videos)[0] # Get the first new video return ( final_result, errors, model_actions, model_thoughts, latest_video, trace_file, history_file, gr.update(value="Stop", interactive=True), # Re-enable stop button gr.update(value="Run", interactive=True) # Re-enable run button ) except Exception as e: import traceback traceback.print_exc() errors = str(e) + "\n" + traceback.format_exc() return ( '', # final_result errors, # errors '', # model_actions '', # model_thoughts None, # latest_video None, # history_file None, # trace_file gr.update(value="Stop", interactive=True), # Re-enable stop button gr.update(value="Run", interactive=True) # Re-enable run button ) async def run_org_agent( llm, use_own_browser, keep_browser_open, headless, disable_security, window_w, window_h, save_recording_path, save_agent_history_path, save_trace_path, task, max_steps, use_vision, max_actions_per_step, tool_call_in_content ): try: global _global_browser, _global_browser_context, _global_agent_state # Clear any previous stop request _global_agent_state.clear_stop() if use_own_browser: chrome_path = os.getenv("CHROME_PATH", None) if chrome_path == "": chrome_path = None else: chrome_path = None if _global_browser is None: _global_browser = Browser( config=BrowserConfig( headless=headless, disable_security=disable_security, chrome_instance_path=chrome_path, extra_chromium_args=[f"--window-size={window_w},{window_h}"], ) ) if _global_browser_context is None: _global_browser_context = await _global_browser.new_context( config=BrowserContextConfig( trace_path=save_trace_path if save_trace_path else None, save_recording_path=save_recording_path if save_recording_path else None, no_viewport=False, browser_window_size=BrowserContextWindowSize( width=window_w, height=window_h ), ) ) agent = Agent( task=task, llm=llm, use_vision=use_vision, browser=_global_browser, browser_context=_global_browser_context, max_actions_per_step=max_actions_per_step, tool_call_in_content=tool_call_in_content ) history = await agent.run(max_steps=max_steps) history_file = os.path.join(save_agent_history_path, f"{agent.agent_id}.json") agent.save_history(history_file) final_result = history.final_result() errors = history.errors() model_actions = history.model_actions() model_thoughts = history.model_thoughts() trace_file = get_latest_files(save_trace_path) return final_result, errors, model_actions, model_thoughts, trace_file.get('.zip'), history_file except Exception as e: import traceback traceback.print_exc() errors = str(e) + "\n" + traceback.format_exc() return '', errors, '', '', None finally: # Handle cleanup based on persistence configuration if not keep_browser_open: if _global_browser_context: await _global_browser_context.close() _global_browser_context = None if _global_browser: await _global_browser.close() _global_browser = None async def run_custom_agent( llm, use_own_browser, keep_browser_open, headless, disable_security, window_w, window_h, save_recording_path, save_agent_history_path, save_trace_path, task, add_infos, max_steps, use_vision, max_actions_per_step, tool_call_in_content ): try: global _global_browser, _global_browser_context, _global_agent_state # Clear any previous stop request _global_agent_state.clear_stop() if use_own_browser: chrome_path = os.getenv("CHROME_PATH", None) if chrome_path == "": chrome_path = None else: chrome_path = None controller = CustomController() # Initialize global browser if needed if _global_browser is None: _global_browser = CustomBrowser( config=BrowserConfig( headless=headless, disable_security=disable_security, chrome_instance_path=chrome_path, extra_chromium_args=[f"--window-size={window_w},{window_h}"], ) ) if _global_browser_context is None: _global_browser_context = await _global_browser.new_context( config=BrowserContextConfig( trace_path=save_trace_path if save_trace_path else None, save_recording_path=save_recording_path if save_recording_path else None, no_viewport=False, browser_window_size=BrowserContextWindowSize( width=window_w, height=window_h ), ) ) # Create and run agent agent = CustomAgent( task=task, add_infos=add_infos, use_vision=use_vision, llm=llm, browser=_global_browser, browser_context=_global_browser_context, controller=controller, system_prompt_class=CustomSystemPrompt, max_actions_per_step=max_actions_per_step, tool_call_in_content=tool_call_in_content, agent_state=_global_agent_state ) history = await agent.run(max_steps=max_steps) history_file = os.path.join(save_agent_history_path, f"{agent.agent_id}.json") agent.save_history(history_file) final_result = history.final_result() errors = history.errors() model_actions = history.model_actions() model_thoughts = history.model_thoughts() trace_file = get_latest_files(save_trace_path) return final_result, errors, model_actions, model_thoughts, trace_file.get('.zip'), history_file except Exception as e: import traceback traceback.print_exc() errors = str(e) + "\n" + traceback.format_exc() return '', errors, '', '', None, None finally: # Handle cleanup based on persistence configuration if not keep_browser_open: if _global_browser_context: await _global_browser_context.close() _global_browser_context = None if _global_browser: await _global_browser.close() _global_browser = None async def run_with_stream( agent_type, llm_provider, llm_model_name, llm_temperature, llm_base_url, llm_api_key, use_own_browser, keep_browser_open, headless, disable_security, window_w, window_h, save_recording_path, save_agent_history_path, save_trace_path, enable_recording, task, add_infos, max_steps, use_vision, max_actions_per_step, tool_call_in_content ): stream_vw = 80 stream_vh = int(80 * window_h // window_w) if not headless: result = await run_browser_agent( agent_type=agent_type, llm_provider=llm_provider, llm_model_name=llm_model_name, llm_temperature=llm_temperature, llm_base_url=llm_base_url, llm_api_key=llm_api_key, use_own_browser=use_own_browser, keep_browser_open=keep_browser_open, headless=headless, disable_security=disable_security, window_w=window_w, window_h=window_h, save_recording_path=save_recording_path, save_agent_history_path=save_agent_history_path, save_trace_path=save_trace_path, enable_recording=enable_recording, task=task, add_infos=add_infos, max_steps=max_steps, use_vision=use_vision, max_actions_per_step=max_actions_per_step, tool_call_in_content=tool_call_in_content ) # Add HTML content at the start of the result array html_content = f"