From a1ec7ad012ee285a5eecde31fa433b4099fd9cdb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magnus=20M=C3=BCller?= <67061560+MagMueller@users.noreply.github.com> Date: Fri, 2 May 2025 13:21:39 +0800 Subject: [PATCH 1/3] Update browser-use package to version 0.1.42 --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index a9f6c87..01fe29a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -browser-use==0.1.41 +browser-use==0.1.42 pyperclip==1.9.0 gradio==5.27.0 json-repair From 74bea17eb1f48213f5c0d99cd5a18326bd747372 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magnus=20M=C3=BCller?= <67061560+MagMueller@users.noreply.github.com> Date: Fri, 2 May 2025 13:21:47 +0800 Subject: [PATCH 2/3] Refactor browser agent and update dependencies - Updated import statements to use 'patchright' instead of 'playwright'. - Cleaned up the BrowserUseAgent class for better readability. - Modified README instructions for browser installation. - Added new entries to .gitignore for PDF files and workflow. --- .gitignore | 4 +- README.md | 7 +- src/agent/browser_use/browser_use_agent.py | 96 ++++++++-------------- src/browser/custom_browser.py | 8 +- src/browser/custom_context.py | 4 +- tests/test_agents.py | 2 +- tests/test_playwright.py | 2 +- 7 files changed, 45 insertions(+), 78 deletions(-) diff --git a/.gitignore b/.gitignore index 548d48d..a7a55cd 100644 --- a/.gitignore +++ b/.gitignore @@ -187,4 +187,6 @@ data/ # For Config Files (Current Settings) .config.pkl -*.pdf \ No newline at end of file +*.pdf + +workflow \ No newline at end of file diff --git a/README.md b/README.md index 355ff76..91fb7fa 100644 --- a/README.md +++ b/README.md @@ -68,12 +68,7 @@ uv pip install -r requirements.txt Install Browsers in Playwright: You can install specific browsers by running: ```bash -playwright install --with-deps chromium -``` - -To install all browsers: -```bash -playwright install +patchright install chromium ``` #### Step 4: Configure Environment diff --git a/src/agent/browser_use/browser_use_agent.py b/src/agent/browser_use/browser_use_agent.py index a38211e..9234bca 100644 --- a/src/agent/browser_use/browser_use_agent.py +++ b/src/agent/browser_use/browser_use_agent.py @@ -1,75 +1,37 @@ from __future__ import annotations import asyncio -import gc -import inspect -import json import logging import os -import re -import time -from pathlib import Path -from typing import Any, Awaitable, Callable, Dict, Generic, List, Optional, TypeVar, Union - -from dotenv import load_dotenv -from langchain_core.language_models.chat_models import BaseChatModel -from langchain_core.messages import ( - BaseMessage, - HumanMessage, - SystemMessage, -) # from lmnr.sdk.decorators import observe -from pydantic import BaseModel, ValidationError - from browser_use.agent.gif import create_history_gif -from browser_use.agent.memory.service import Memory, MemorySettings -from browser_use.agent.message_manager.service import MessageManager, MessageManagerSettings -from browser_use.agent.message_manager.utils import convert_input_messages, extract_json_from_model_output, save_conversation -from browser_use.agent.prompts import AgentMessagePrompt, PlannerPrompt, SystemPrompt -from browser_use.agent.views import ( - REQUIRED_LLM_API_ENV_VARS, - ActionResult, - AgentError, - AgentHistory, - AgentHistoryList, - AgentOutput, - AgentSettings, - AgentState, - AgentStepInfo, - StepMetadata, - ToolCallingMethod, -) -from browser_use.browser.browser import Browser -from browser_use.browser.context import BrowserContext -from browser_use.browser.views import BrowserState, BrowserStateHistory -from browser_use.controller.registry.views import ActionModel -from browser_use.controller.service import Controller -from browser_use.dom.history_tree_processor.service import ( - DOMHistoryElement, - HistoryTreeProcessor, -) -from browser_use.exceptions import LLMException -from browser_use.telemetry.service import ProductTelemetry -from browser_use.telemetry.views import ( - AgentEndTelemetryEvent, - AgentRunTelemetryEvent, - AgentStepTelemetryEvent, -) -from browser_use.utils import check_env_variables, time_execution_async, time_execution_sync from browser_use.agent.service import Agent, AgentHookFunc +from browser_use.agent.views import ( + AgentHistoryList, + AgentStepInfo, +) +from browser_use.telemetry.views import ( + AgentEndTelemetryEvent, +) +from browser_use.utils import time_execution_async +from dotenv import load_dotenv load_dotenv() logger = logging.getLogger(__name__) -SKIP_LLM_API_KEY_VERIFICATION = os.environ.get('SKIP_LLM_API_KEY_VERIFICATION', 'false').lower()[0] in 'ty1' +SKIP_LLM_API_KEY_VERIFICATION = ( + os.environ.get("SKIP_LLM_API_KEY_VERIFICATION", "false").lower()[0] in "ty1" +) class BrowserUseAgent(Agent): - @time_execution_async('--run (agent)') + @time_execution_async("--run (agent)") async def run( - self, max_steps: int = 100, on_step_start: AgentHookFunc | None = None, - on_step_end: AgentHookFunc | None = None + self, + max_steps: int = 100, + on_step_start: AgentHookFunc | None = None, + on_step_end: AgentHookFunc | None = None, ) -> AgentHistoryList: """Execute the task with maximum number of steps""" @@ -88,7 +50,7 @@ class BrowserUseAgent(Agent): signal_handler.register() # Wait for verification task to complete if it exists - if hasattr(self, '_verification_task') and not self._verification_task.done(): + if hasattr(self, "_verification_task") and not self._verification_task.done(): try: await self._verification_task except Exception: @@ -100,7 +62,9 @@ class BrowserUseAgent(Agent): # Execute initial actions if provided if self.initial_actions: - result = await self.multi_act(self.initial_actions, check_for_new_elements=False) + result = await self.multi_act( + self.initial_actions, check_for_new_elements=False + ) self.state.last_result = result for step in range(max_steps): @@ -112,12 +76,14 @@ class BrowserUseAgent(Agent): # Check if we should stop due to too many failures if self.state.consecutive_failures >= self.settings.max_failures: - logger.error(f'❌ Stopping due to {self.settings.max_failures} consecutive failures') + logger.error( + f"❌ Stopping due to {self.settings.max_failures} consecutive failures" + ) break # Check control flags before each step if self.state.stopped: - logger.info('Agent stopped') + logger.info("Agent stopped") break while self.state.paused: @@ -142,13 +108,15 @@ class BrowserUseAgent(Agent): await self.log_completion() break else: - logger.info('❌ Failed to complete task in maximum steps') + logger.info("❌ Failed to complete task in maximum steps") return self.state.history except KeyboardInterrupt: # Already handled by our signal handler, but catch any direct KeyboardInterrupt as well - logger.info('Got KeyboardInterrupt during execution, returning current history') + logger.info( + "Got KeyboardInterrupt during execution, returning current history" + ) return self.state.history finally: @@ -171,8 +139,10 @@ class BrowserUseAgent(Agent): await self.close() if self.settings.generate_gif: - output_path: str = 'agent_history.gif' + output_path: str = "agent_history.gif" if isinstance(self.settings.generate_gif, str): output_path = self.settings.generate_gif - create_history_gif(task=self.task, history=self.state.history, output_path=output_path) \ No newline at end of file + create_history_gif( + task=self.task, history=self.state.history, output_path=output_path + ) diff --git a/src/browser/custom_browser.py b/src/browser/custom_browser.py index 6db980f..02875e3 100644 --- a/src/browser/custom_browser.py +++ b/src/browser/custom_browser.py @@ -1,17 +1,17 @@ import asyncio import pdb -from playwright.async_api import Browser as PlaywrightBrowser -from playwright.async_api import ( +from patchright.async_api import Browser as PlaywrightBrowser +from patchright.async_api import ( BrowserContext as PlaywrightBrowserContext, ) -from playwright.async_api import ( +from patchright.async_api import ( Playwright, async_playwright, ) from browser_use.browser.browser import Browser, IN_DOCKER from browser_use.browser.context import BrowserContext, BrowserContextConfig -from playwright.async_api import BrowserContext as PlaywrightBrowserContext +from patchright.async_api import BrowserContext as PlaywrightBrowserContext import logging from browser_use.browser.chrome import ( diff --git a/src/browser/custom_context.py b/src/browser/custom_context.py index 43a67a8..753b4c5 100644 --- a/src/browser/custom_context.py +++ b/src/browser/custom_context.py @@ -4,8 +4,8 @@ import os from browser_use.browser.browser import Browser, IN_DOCKER from browser_use.browser.context import BrowserContext, BrowserContextConfig -from playwright.async_api import Browser as PlaywrightBrowser -from playwright.async_api import BrowserContext as PlaywrightBrowserContext +from patchright.async_api import Browser as PlaywrightBrowser +from patchright.async_api import BrowserContext as PlaywrightBrowserContext from typing import Optional from browser_use.browser.context import BrowserContextState diff --git a/tests/test_agents.py b/tests/test_agents.py index 23a6fb0..ffa743f 100644 --- a/tests/test_agents.py +++ b/tests/test_agents.py @@ -169,7 +169,7 @@ async def test_browser_use_agent(): async def test_browser_use_parallel(): from browser_use.browser.context import BrowserContextWindowSize from browser_use.browser.browser import BrowserConfig - from playwright.async_api import async_playwright + from patchright.async_api import async_playwright from browser_use.browser.browser import Browser from src.browser.custom_context import BrowserContextConfig from src.controller.custom_controller import CustomController diff --git a/tests/test_playwright.py b/tests/test_playwright.py index 6704a02..5a522fd 100644 --- a/tests/test_playwright.py +++ b/tests/test_playwright.py @@ -6,7 +6,7 @@ load_dotenv() def test_connect_browser(): import os - from playwright.sync_api import sync_playwright + from patchright.sync_api import sync_playwright chrome_exe = os.getenv("CHROME_PATH", "") chrome_use_data = os.getenv("CHROME_USER_DATA", "") From 40a61fa216aeed578cb86339bc790ca1c286a8ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Magnus=20M=C3=BCller?= <67061560+MagMueller@users.noreply.github.com> Date: Fri, 2 May 2025 13:25:59 +0800 Subject: [PATCH 3/3] Added source = webui --- .../deep_research/deep_research_agent.py | 566 ++++++++++------ src/webui/components/browser_use_agent_tab.py | 610 ++++++++++++------ 2 files changed, 772 insertions(+), 404 deletions(-) diff --git a/src/agent/deep_research/deep_research_agent.py b/src/agent/deep_research/deep_research_agent.py index c9ee3c1..2f6c672 100644 --- a/src/agent/deep_research/deep_research_agent.py +++ b/src/agent/deep_research/deep_research_agent.py @@ -2,34 +2,38 @@ import asyncio import json import logging import os -import pdb +import threading import uuid from pathlib import Path -from typing import List, Dict, Any, TypedDict, Optional, Sequence, Annotated -from concurrent.futures import ThreadPoolExecutor, as_completed -import threading - -# Langchain imports -from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, ToolMessage, SystemMessage -from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder -from langchain_core.tools import Tool, StructuredTool -from langchain.agents import AgentExecutor # We might use parts, but Langgraph is primary -from langchain_community.tools.file_management import WriteFileTool, ReadFileTool, CopyFileTool, ListDirectoryTool, \ - MoveFileTool, FileSearchTool -from langchain_openai import ChatOpenAI # Replace with your actual LLM import -from pydantic import BaseModel, Field -import operator +from typing import Any, Dict, List, Optional, TypedDict from browser_use.browser.browser import BrowserConfig from browser_use.browser.context import BrowserContextWindowSize +from langchain_community.tools.file_management import ( + ListDirectoryTool, + ReadFileTool, + WriteFileTool, +) + +# Langchain imports +from langchain_core.messages import ( + AIMessage, + BaseMessage, + HumanMessage, + SystemMessage, + ToolMessage, +) +from langchain_core.prompts import ChatPromptTemplate +from langchain_core.tools import StructuredTool, Tool # Langgraph imports -from langgraph.graph import StateGraph, END -from src.controller.custom_controller import CustomController -from src.utils import llm_provider -from src.browser.custom_browser import CustomBrowser -from src.browser.custom_context import CustomBrowserContext, CustomBrowserContextConfig +from langgraph.graph import StateGraph +from pydantic import BaseModel, Field + from src.agent.browser_use.browser_use_agent import BrowserUseAgent +from src.browser.custom_browser import CustomBrowser +from src.browser.custom_context import CustomBrowserContextConfig +from src.controller.custom_controller import CustomController from src.utils.mcp_client import setup_mcp_client_and_tools logger = logging.getLogger(__name__) @@ -44,19 +48,22 @@ _BROWSER_AGENT_INSTANCES = {} async def run_single_browser_task( - task_query: str, - task_id: str, - llm: Any, # Pass the main LLM - browser_config: Dict[str, Any], - stop_event: threading.Event, - use_vision: bool = False, + task_query: str, + task_id: str, + llm: Any, # Pass the main LLM + browser_config: Dict[str, Any], + stop_event: threading.Event, + use_vision: bool = False, ) -> Dict[str, Any]: """ Runs a single BrowserUseAgent task. Manages browser creation and closing for this specific task. """ if not BrowserUseAgent: - return {"query": task_query, "error": "BrowserUseAgent components not available."} + return { + "query": task_query, + "error": "BrowserUseAgent components not available.", + } # --- Browser Setup --- # These should ideally come from the main agent's config @@ -79,9 +86,11 @@ async def run_single_browser_task( extra_args.append(f"--user-data-dir={browser_user_data_dir}") if use_own_browser: browser_binary_path = os.getenv("CHROME_PATH", None) or browser_binary_path - if browser_binary_path == "": browser_binary_path = None + if browser_binary_path == "": + browser_binary_path = None chrome_user_data = os.getenv("CHROME_USER_DATA", None) - if chrome_user_data: extra_args += [f"--user-data-dir={chrome_user_data}"] + if chrome_user_data: + extra_args += [f"--user-data-dir={chrome_user_data}"] else: browser_binary_path = None @@ -98,8 +107,10 @@ async def run_single_browser_task( context_config = CustomBrowserContextConfig( save_downloads_path="./tmp/downloads", - browser_window_size=BrowserContextWindowSize(width=window_w, height=window_h), - force_new_context=True + browser_window_size=BrowserContextWindowSize( + width=window_w, height=window_h + ), + force_new_context=True, ) bu_browser_context = await bu_browser.new_context(config=context_config) @@ -126,6 +137,7 @@ async def run_single_browser_task( browser_context=bu_browser_context, controller=bu_controller, use_vision=use_vision, + source="webui", ) # Store instance for potential stop() call @@ -157,7 +169,9 @@ async def run_single_browser_task( return {"query": task_query, "result": final_data, "status": "completed"} except Exception as e: - logger.error(f"Error during browser task for query '{task_query}': {e}", exc_info=True) + logger.error( + f"Error during browser task for query '{task_query}': {e}", exc_info=True + ) return {"query": task_query, "error": str(e), "status": "failed"} finally: if bu_browser_context: @@ -181,16 +195,17 @@ async def run_single_browser_task( class BrowserSearchInput(BaseModel): queries: List[str] = Field( - description=f"List of distinct search queries to find information relevant to the research task.") + description="List of distinct search queries to find information relevant to the research task." + ) async def _run_browser_search_tool( - queries: List[str], - task_id: str, # Injected dependency - llm: Any, # Injected dependency - browser_config: Dict[str, Any], - stop_event: threading.Event, - max_parallel_browsers: int = 1 + queries: List[str], + task_id: str, # Injected dependency + llm: Any, # Injected dependency + browser_config: Dict[str, Any], + stop_event: threading.Event, + max_parallel_browsers: int = 1, ) -> List[Dict[str, Any]]: """ Internal function to execute parallel browser searches based on LLM-provided queries. @@ -199,7 +214,9 @@ async def _run_browser_search_tool( # Limit queries just in case LLM ignores the description queries = queries[:max_parallel_browsers] - logger.info(f"[Browser Tool {task_id}] Running search for {len(queries)} queries: {queries}") + logger.info( + f"[Browser Tool {task_id}] Running search for {len(queries)} queries: {queries}" + ) results = [] semaphore = asyncio.Semaphore(max_parallel_browsers) @@ -207,7 +224,9 @@ async def _run_browser_search_tool( async def task_wrapper(query): async with semaphore: if stop_event.is_set(): - logger.info(f"[Browser Tool {task_id}] Skipping task due to stop signal: {query}") + logger.info( + f"[Browser Tool {task_id}] Skipping task due to stop signal: {query}" + ) return {"query": query, "result": None, "status": "cancelled"} # Pass necessary injected configs and the stop event return await run_single_browser_task( @@ -215,7 +234,7 @@ async def _run_browser_search_tool( task_id, llm, # Pass the main LLM (or a dedicated one if needed) browser_config, - stop_event + stop_event, # use_vision could be added here if needed ) @@ -226,35 +245,47 @@ async def _run_browser_search_tool( for i, res in enumerate(search_results): query = queries[i] # Get corresponding query if isinstance(res, Exception): - logger.error(f"[Browser Tool {task_id}] Gather caught exception for query '{query}': {res}", exc_info=True) - processed_results.append({"query": query, "error": str(res), "status": "failed"}) + logger.error( + f"[Browser Tool {task_id}] Gather caught exception for query '{query}': {res}", + exc_info=True, + ) + processed_results.append( + {"query": query, "error": str(res), "status": "failed"} + ) elif isinstance(res, dict): processed_results.append(res) else: - logger.error(f"[Browser Tool {task_id}] Unexpected result type for query '{query}': {type(res)}") - processed_results.append({"query": query, "error": "Unexpected result type", "status": "failed"}) + logger.error( + f"[Browser Tool {task_id}] Unexpected result type for query '{query}': {type(res)}" + ) + processed_results.append( + {"query": query, "error": "Unexpected result type", "status": "failed"} + ) - logger.info(f"[Browser Tool {task_id}] Finished search. Results count: {len(processed_results)}") + logger.info( + f"[Browser Tool {task_id}] Finished search. Results count: {len(processed_results)}" + ) return processed_results def create_browser_search_tool( - llm: Any, - browser_config: Dict[str, Any], - task_id: str, - stop_event: threading.Event, - max_parallel_browsers: int = 1, + llm: Any, + browser_config: Dict[str, Any], + task_id: str, + stop_event: threading.Event, + max_parallel_browsers: int = 1, ) -> StructuredTool: """Factory function to create the browser search tool with necessary dependencies.""" # Use partial to bind the dependencies that aren't part of the LLM call arguments from functools import partial + bound_tool_func = partial( _run_browser_search_tool, task_id=task_id, llm=llm, browser_config=browser_config, stop_event=stop_event, - max_parallel_browsers=max_parallel_browsers + max_parallel_browsers=max_parallel_browsers, ) return StructuredTool.from_function( @@ -269,6 +300,7 @@ Provide a list of distinct search queries(up to {max_parallel_browsers}) that ar # --- Langgraph State Definition --- + class ResearchPlanItem(TypedDict): step: int task: str @@ -298,6 +330,7 @@ class DeepResearchState(TypedDict): # --- Langgraph Nodes --- + def _load_previous_state(task_id: str, output_dir: str) -> Dict[str, Any]: """Loads state from files if they exist.""" state_updates = {} @@ -305,7 +338,7 @@ def _load_previous_state(task_id: str, output_dir: str) -> Dict[str, Any]: search_file = os.path.join(output_dir, SEARCH_INFO_FILENAME) if os.path.exists(plan_file): try: - with open(plan_file, 'r', encoding='utf-8') as f: + with open(plan_file, "r", encoding="utf-8") as f: # Basic parsing, assumes markdown checklist format plan = [] step = 1 @@ -315,24 +348,36 @@ def _load_previous_state(task_id: str, output_dir: str) -> Dict[str, Any]: status = "completed" if line.startswith("- [x]") else "pending" task = line[5:].strip() plan.append( - ResearchPlanItem(step=step, task=task, status=status, queries=None, result_summary=None)) + ResearchPlanItem( + step=step, + task=task, + status=status, + queries=None, + result_summary=None, + ) + ) step += 1 - state_updates['research_plan'] = plan + state_updates["research_plan"] = plan # Determine next step index based on loaded plan - next_step = next((i for i, item in enumerate(plan) if item['status'] == 'pending'), len(plan)) - state_updates['current_step_index'] = next_step - logger.info(f"Loaded research plan from {plan_file}, next step index: {next_step}") + next_step = next( + (i for i, item in enumerate(plan) if item["status"] == "pending"), + len(plan), + ) + state_updates["current_step_index"] = next_step + logger.info( + f"Loaded research plan from {plan_file}, next step index: {next_step}" + ) except Exception as e: logger.error(f"Failed to load or parse research plan {plan_file}: {e}") - state_updates['error_message'] = f"Failed to load research plan: {e}" + state_updates["error_message"] = f"Failed to load research plan: {e}" if os.path.exists(search_file): try: - with open(search_file, 'r', encoding='utf-8') as f: - state_updates['search_results'] = json.load(f) + with open(search_file, "r", encoding="utf-8") as f: + state_updates["search_results"] = json.load(f) logger.info(f"Loaded search results from {search_file}") except Exception as e: logger.error(f"Failed to load search results {search_file}: {e}") - state_updates['error_message'] = f"Failed to load search results: {e}" + state_updates["error_message"] = f"Failed to load search results: {e}" # Decide if this is fatal or if we can continue without old results return state_updates @@ -342,10 +387,10 @@ def _save_plan_to_md(plan: List[ResearchPlanItem], output_dir: str): """Saves the research plan to a markdown checklist file.""" plan_file = os.path.join(output_dir, PLAN_FILENAME) try: - with open(plan_file, 'w', encoding='utf-8') as f: + with open(plan_file, "w", encoding="utf-8") as f: f.write("# Research Plan\n\n") for item in plan: - marker = "- [x]" if item['status'] == 'completed' else "- [ ]" + marker = "- [x]" if item["status"] == "completed" else "- [ ]" f.write(f"{marker} {item['task']}\n") logger.info(f"Research plan saved to {plan_file}") except Exception as e: @@ -357,7 +402,7 @@ def _save_search_results_to_json(results: List[Dict[str, Any]], output_dir: str) search_file = os.path.join(output_dir, SEARCH_INFO_FILENAME) try: # Simple overwrite for now, could be append - with open(search_file, 'w', encoding='utf-8') as f: + with open(search_file, "w", encoding="utf-8") as f: json.dump(results, f, indent=2, ensure_ascii=False) logger.info(f"Search results saved to {search_file}") except Exception as e: @@ -368,7 +413,7 @@ def _save_report_to_md(report: str, output_dir: Path): """Saves the final report to a markdown file.""" report_file = os.path.join(output_dir, REPORT_FILENAME) try: - with open(report_file, 'w', encoding='utf-8') as f: + with open(report_file, "w", encoding="utf-8") as f: f.write(report) logger.info(f"Final report saved to {report_file}") except Exception as e: @@ -378,17 +423,17 @@ def _save_report_to_md(report: str, output_dir: Path): async def planning_node(state: DeepResearchState) -> Dict[str, Any]: """Generates the initial research plan or refines it if resuming.""" logger.info("--- Entering Planning Node ---") - if state.get('stop_requested'): + if state.get("stop_requested"): logger.info("Stop requested, skipping planning.") return {"stop_requested": True} - llm = state['llm'] - topic = state['topic'] - existing_plan = state.get('research_plan') - existing_results = state.get('search_results') - output_dir = state['output_dir'] + llm = state["llm"] + topic = state["topic"] + existing_plan = state.get("research_plan") + existing_results = state.get("search_results") + output_dir = state["output_dir"] - if existing_plan and state.get('current_step_index', 0) > 0: + if existing_plan and state.get("current_step_index", 0) > 0: logger.info("Resuming with existing plan.") # Maybe add logic here to let LLM review and potentially adjust the plan # based on existing_results, but for now, we just use the loaded plan. @@ -397,8 +442,11 @@ async def planning_node(state: DeepResearchState) -> Dict[str, Any]: logger.info(f"Generating new research plan for topic: {topic}") - prompt = ChatPromptTemplate.from_messages([ - ("system", """You are a meticulous research assistant. Your goal is to create a step-by-step research plan to thoroughly investigate a given topic. + prompt = ChatPromptTemplate.from_messages( + [ + ( + "system", + """You are a meticulous research assistant. Your goal is to create a step-by-step research plan to thoroughly investigate a given topic. The plan should consist of clear, actionable research tasks or questions. Each step should logically build towards a comprehensive understanding. Format the output as a numbered list. Each item should represent a distinct research step or question. Example: @@ -410,9 +458,11 @@ async def planning_node(state: DeepResearchState) -> Dict[str, Any]: 6. Summarize the findings and draw conclusions. Keep the plan focused and manageable. Aim for 5-10 detailed steps. - """), - ("human", f"Generate a research plan for the topic: {topic}") - ]) + """, + ), + ("human", f"Generate a research plan for the topic: {topic}"), + ] + ) try: response = await llm.ainvoke(prompt.format_prompt(topic=topic).to_messages()) @@ -420,19 +470,25 @@ async def planning_node(state: DeepResearchState) -> Dict[str, Any]: # Parse the numbered list into the plan structure new_plan: List[ResearchPlanItem] = [] - for i, line in enumerate(plan_text.strip().split('\n')): + for i, line in enumerate(plan_text.strip().split("\n")): line = line.strip() if line and (line[0].isdigit() or line.startswith(("*", "-"))): # Simple parsing: remove number/bullet and space - task_text = line.split('.', 1)[-1].strip() if line[0].isdigit() else line[1:].strip() + task_text = ( + line.split(".", 1)[-1].strip() + if line[0].isdigit() + else line[1:].strip() + ) if task_text: - new_plan.append(ResearchPlanItem( - step=i + 1, - task=task_text, - status="pending", - queries=None, - result_summary=None - )) + new_plan.append( + ResearchPlanItem( + step=i + 1, + task=task_text, + status="pending", + queries=None, + result_summary=None, + ) + ) if not new_plan: logger.error("LLM failed to generate a valid plan structure.") @@ -458,16 +514,19 @@ async def research_execution_node(state: DeepResearchState) -> Dict[str, Any]: The LLM decides which tool (e.g., browser search) to use and provides arguments. """ logger.info("--- Entering Research Execution Node ---") - if state.get('stop_requested'): + if state.get("stop_requested"): logger.info("Stop requested, skipping research execution.") - return {"stop_requested": True, "current_step_index": state['current_step_index']} # Keep index same + return { + "stop_requested": True, + "current_step_index": state["current_step_index"], + } # Keep index same - plan = state['research_plan'] - current_index = state['current_step_index'] - llm = state['llm'] - tools = state['tools'] # Tools are now passed in state - output_dir = str(state['output_dir']) - task_id = state['task_id'] + plan = state["research_plan"] + current_index = state["current_step_index"] + llm = state["llm"] + tools = state["tools"] # Tools are now passed in state + output_dir = str(state["output_dir"]) + task_id = state["task_id"] # Stop event is bound inside the tool function, no need to pass directly here if not plan or current_index >= len(plan): @@ -476,24 +535,31 @@ async def research_execution_node(state: DeepResearchState) -> Dict[str, Any]: return {} current_step = plan[current_index] - if current_step['status'] == 'completed': + if current_step["status"] == "completed": logger.info(f"Step {current_step['step']} already completed, skipping.") return {"current_step_index": current_index + 1} # Move to next step - logger.info(f"Executing research step {current_step['step']}: {current_step['task']}") + logger.info( + f"Executing research step {current_step['step']}: {current_step['task']}" + ) # Bind tools to the LLM for this call llm_with_tools = llm.bind_tools(tools) - if state['messages']: - current_task_message = [HumanMessage( - content=f"Research Task (Step {current_step['step']}): {current_step['task']}")] - invocation_messages = state['messages'] + current_task_message + if state["messages"]: + current_task_message = [ + HumanMessage( + content=f"Research Task (Step {current_step['step']}): {current_step['task']}" + ) + ] + invocation_messages = state["messages"] + current_task_message else: current_task_message = [ SystemMessage( - content="You are a research assistant executing one step of a research plan. Use the available tools, especially the 'parallel_browser_search' tool, to gather information needed for the current task. Be precise with your search queries if using the browser tool."), + content="You are a research assistant executing one step of a research plan. Use the available tools, especially the 'parallel_browser_search' tool, to gather information needed for the current task. Be precise with your search queries if using the browser tool." + ), HumanMessage( - content=f"Research Task (Step {current_step['step']}): {current_step['task']}") + content=f"Research Task (Step {current_step['step']}): {current_step['task']}" + ), ] invocation_messages = current_task_message @@ -509,16 +575,17 @@ async def research_execution_node(state: DeepResearchState) -> Dict[str, Any]: if not isinstance(ai_response, AIMessage) or not ai_response.tool_calls: # LLM didn't call a tool. Maybe it answered directly? Or failed? logger.warning( - f"LLM did not call any tool for step {current_step['step']}. Response: {ai_response.content[:100]}...") + f"LLM did not call any tool for step {current_step['step']}. Response: {ai_response.content[:100]}..." + ) # How to handle this? Mark step as failed? Or store the content? # Let's mark as failed for now, assuming a tool was expected. - current_step['status'] = 'failed' - current_step['result_summary'] = "LLM did not use a tool as expected." + current_step["status"] = "failed" + current_step["result_summary"] = "LLM did not use a tool as expected." _save_plan_to_md(plan, output_dir) return { "research_plan": plan, "current_step_index": current_index + 1, - "error_message": f"LLM failed to call a tool for step {current_step['step']}." + "error_message": f"LLM failed to call a tool for step {current_step['step']}.", } # Process tool calls @@ -536,10 +603,12 @@ async def research_execution_node(state: DeepResearchState) -> Dict[str, Any]: if not selected_tool: logger.error(f"LLM called tool '{tool_name}' which is not available.") # Create a ToolMessage indicating the error - tool_results.append(ToolMessage( - content=f"Error: Tool '{tool_name}' not found.", - tool_call_id=tool_call_id - )) + tool_results.append( + ToolMessage( + content=f"Error: Tool '{tool_name}' not found.", + tool_call_id=tool_call_id, + ) + ) continue # Skip to next tool call if any # Execute the tool @@ -548,7 +617,7 @@ async def research_execution_node(state: DeepResearchState) -> Dict[str, Any]: stop_event = _AGENT_STOP_FLAGS.get(task_id) if stop_event and stop_event.is_set(): logger.info(f"Stop requested before executing tool: {tool_name}") - current_step['status'] = 'pending' # Not completed due to stop + current_step["status"] = "pending" # Not completed due to stop _save_plan_to_md(plan, output_dir) return {"stop_requested": True, "research_plan": plan} @@ -558,46 +627,67 @@ async def research_execution_node(state: DeepResearchState) -> Dict[str, Any]: logger.info(f"Tool '{tool_name}' executed successfully.") browser_tool_called = "parallel_browser_search" in executed_tool_names # Append result to overall search results - current_search_results = state.get('search_results', []) + current_search_results = state.get("search_results", []) if browser_tool_called: # Specific handling for browser tool output current_search_results.extend(tool_output) else: # Handle other tool outputs (e.g., file tools return strings) # Store it associated with the step? Or a generic log? # Let's just log it for now. Need better handling for diverse tool outputs. - logger.info(f"Result from tool '{tool_name}': {str(tool_output)[:200]}...") + logger.info( + f"Result from tool '{tool_name}': {str(tool_output)[:200]}..." + ) # Store result for potential next LLM call (if we were doing multi-turn) - tool_results.append(ToolMessage( - content=json.dumps(tool_output), - tool_call_id=tool_call_id - )) + tool_results.append( + ToolMessage( + content=json.dumps(tool_output), tool_call_id=tool_call_id + ) + ) except Exception as e: logger.error(f"Error executing tool '{tool_name}': {e}", exc_info=True) - tool_results.append(ToolMessage( - content=f"Error executing tool {tool_name}: {e}", - tool_call_id=tool_call_id - )) + tool_results.append( + ToolMessage( + content=f"Error executing tool {tool_name}: {e}", + tool_call_id=tool_call_id, + ) + ) # Also update overall state search_results with error? - current_search_results = state.get('search_results', []) + current_search_results = state.get("search_results", []) current_search_results.append( - {"tool_name": tool_name, "args": tool_args, "status": "failed", "error": str(e)}) + { + "tool_name": tool_name, + "args": tool_args, + "status": "failed", + "error": str(e), + } + ) # Basic check: Did the browser tool run at all? (More specific checks needed) browser_tool_called = "parallel_browser_search" in executed_tool_names # We might need a more nuanced status based on the *content* of tool_results - step_failed = any("Error:" in str(tr.content) for tr in tool_results) or not browser_tool_called + step_failed = ( + any("Error:" in str(tr.content) for tr in tool_results) + or not browser_tool_called + ) if step_failed: - logger.warning(f"Step {current_step['step']} failed or did not yield results via browser search.") - current_step['status'] = 'failed' - current_step[ - 'result_summary'] = f"Tool execution failed or browser tool not used. Errors: {[tr.content for tr in tool_results if 'Error' in str(tr.content)]}" + logger.warning( + f"Step {current_step['step']} failed or did not yield results via browser search." + ) + current_step["status"] = "failed" + current_step["result_summary"] = ( + f"Tool execution failed or browser tool not used. Errors: {[tr.content for tr in tool_results if 'Error' in str(tr.content)]}" + ) else: - logger.info(f"Step {current_step['step']} completed using tool(s): {executed_tool_names}.") - current_step['status'] = 'completed' + logger.info( + f"Step {current_step['step']} completed using tool(s): {executed_tool_names}." + ) + current_step["status"] = "completed" - current_step['result_summary'] = f"Executed tool(s): {', '.join(executed_tool_names)}." + current_step["result_summary"] = ( + f"Executed tool(s): {', '.join(executed_tool_names)}." + ) _save_plan_to_md(plan, output_dir) _save_search_results_to_json(current_search_results, output_dir) @@ -606,34 +696,39 @@ async def research_execution_node(state: DeepResearchState) -> Dict[str, Any]: "research_plan": plan, "search_results": current_search_results, # Update with new results "current_step_index": current_index + 1, - "messages": state["messages"] + current_task_message + [ai_response] + tool_results, + "messages": state["messages"] + + current_task_message + + [ai_response] + + tool_results, # Optionally return the tool_results messages if needed by downstream nodes } except Exception as e: - logger.error(f"Unhandled error during research execution node for step {current_step['step']}: {e}", - exc_info=True) - current_step['status'] = 'failed' + logger.error( + f"Unhandled error during research execution node for step {current_step['step']}: {e}", + exc_info=True, + ) + current_step["status"] = "failed" _save_plan_to_md(plan, output_dir) return { "research_plan": plan, "current_step_index": current_index + 1, # Move on even if error? - "error_message": f"Core Execution Error on step {current_step['step']}: {e}" + "error_message": f"Core Execution Error on step {current_step['step']}: {e}", } async def synthesis_node(state: DeepResearchState) -> Dict[str, Any]: """Synthesizes the final report from the collected search results.""" logger.info("--- Entering Synthesis Node ---") - if state.get('stop_requested'): + if state.get("stop_requested"): logger.info("Stop requested, skipping synthesis.") return {"stop_requested": True} - llm = state['llm'] - topic = state['topic'] - search_results = state.get('search_results', []) - output_dir = state['output_dir'] - plan = state['research_plan'] # Include plan for context + llm = state["llm"] + topic = state["topic"] + search_results = state.get("search_results", []) + output_dir = state["output_dir"] + plan = state["research_plan"] # Include plan for context if not search_results: logger.warning("No search results found to synthesize report.") @@ -641,7 +736,9 @@ async def synthesis_node(state: DeepResearchState) -> Dict[str, Any]: _save_report_to_md(report, output_dir) return {"final_report": report} - logger.info(f"Synthesizing report from {len(search_results)} collected search result entries.") + logger.info( + f"Synthesizing report from {len(search_results)} collected search result entries." + ) # Prepare context for the LLM # Format search results nicely, maybe group by query or original plan step @@ -649,19 +746,21 @@ async def synthesis_node(state: DeepResearchState) -> Dict[str, Any]: references = {} ref_count = 1 for i, result_entry in enumerate(search_results): - query = result_entry.get('query', 'Unknown Query') - status = result_entry.get('status', 'unknown') - result_data = result_entry.get('result') # This should be the dict with summary, title, url - error = result_entry.get('error') + query = result_entry.get("query", "Unknown Query") + status = result_entry.get("status", "unknown") + result_data = result_entry.get( + "result" + ) # This should be the dict with summary, title, url + error = result_entry.get("error") - if status == 'completed' and result_data: + if status == "completed" and result_data: summary = result_data - formatted_results += f"### Finding from Query: \"{query}\"\n" + formatted_results += f'### Finding from Query: "{query}"\n' formatted_results += f"- **Summary:**\n{summary}\n" formatted_results += "---\n" - elif status == 'failed': - formatted_results += f"### Failed Query: \"{query}\"\n" + elif status == "failed": + formatted_results += f'### Failed Query: "{query}"\n' formatted_results += f"- **Error:** {error}\n" formatted_results += "---\n" # Ignore cancelled/other statuses for the report content @@ -669,12 +768,20 @@ async def synthesis_node(state: DeepResearchState) -> Dict[str, Any]: # Prepare the research plan context plan_summary = "\nResearch Plan Followed:\n" for item in plan: - marker = "- [x]" if item['status'] == 'completed' else "- [ ] (Failed)" if item[ - 'status'] == 'failed' else "- [ ]" + marker = ( + "- [x]" + if item["status"] == "completed" + else "- [ ] (Failed)" + if item["status"] == "failed" + else "- [ ]" + ) plan_summary += f"{marker} {item['task']}\n" - synthesis_prompt = ChatPromptTemplate.from_messages([ - ("system", """You are a professional researcher tasked with writing a comprehensive and well-structured report based on collected findings. + synthesis_prompt = ChatPromptTemplate.from_messages( + [ + ( + "system", + """You are a professional researcher tasked with writing a comprehensive and well-structured report based on collected findings. The report should address the research topic thoroughly, synthesizing the information gathered from various sources. Structure the report logically: 1. **Introduction:** Briefly introduce the topic and the report's scope (mentioning the research plan followed is good). @@ -682,8 +789,11 @@ async def synthesis_node(state: DeepResearchState) -> Dict[str, Any]: 3. **Conclusion:** Summarize the main points and offer concluding thoughts or potential areas for further research. Ensure the tone is objective, professional, and analytical. Base the report **strictly** on the provided findings. Do not add external knowledge. If findings are contradictory or incomplete, acknowledge this. - """), - ("human", f""" + """, + ), + ( + "human", + f""" **Research Topic:** {topic} {plan_summary} @@ -696,25 +806,31 @@ async def synthesis_node(state: DeepResearchState) -> Dict[str, Any]: ``` Please generate the final research report in Markdown format based **only** on the information above. Ensure all claims derived from the findings are properly cited using the format [Reference_ID]. - """) - ]) + """, + ), + ] + ) try: - response = await llm.ainvoke(synthesis_prompt.format_prompt( - topic=topic, - plan_summary=plan_summary, - formatted_results=formatted_results, - references=references - ).to_messages()) + response = await llm.ainvoke( + synthesis_prompt.format_prompt( + topic=topic, + plan_summary=plan_summary, + formatted_results=formatted_results, + references=references, + ).to_messages() + ) final_report_md = response.content # Append the reference list automatically to the end of the generated markdown if references: report_references_section = "\n\n## References\n\n" # Sort refs by ID for consistent output - sorted_refs = sorted(references.values(), key=lambda x: x['id']) + sorted_refs = sorted(references.values(), key=lambda x: x["id"]) for ref in sorted_refs: - report_references_section += f"[{ref['id']}] {ref['title']} - {ref['url']}\n" + report_references_section += ( + f"[{ref['id']}] {ref['title']} - {ref['url']}\n" + ) final_report_md += report_references_section logger.info("Successfully synthesized the final report.") @@ -728,28 +844,32 @@ async def synthesis_node(state: DeepResearchState) -> Dict[str, Any]: # --- Langgraph Edges and Conditional Logic --- + def should_continue(state: DeepResearchState) -> str: """Determines the next step based on the current state.""" logger.info("--- Evaluating Condition: Should Continue? ---") - if state.get('stop_requested'): + if state.get("stop_requested"): logger.info("Stop requested, routing to END.") return "end_run" # Go to a dedicated end node for cleanup if needed - if state.get('error_message'): + if state.get("error_message"): logger.warning(f"Error detected: {state['error_message']}. Routing to END.") # Decide if errors should halt execution or if it should try to synthesize anyway return "end_run" # Stop on error for now - plan = state.get('research_plan') - current_index = state.get('current_step_index', 0) + plan = state.get("research_plan") + current_index = state.get("current_step_index", 0) if not plan: - logger.warning("No research plan found, cannot continue execution. Routing to END.") + logger.warning( + "No research plan found, cannot continue execution. Routing to END." + ) return "end_run" # Should not happen if planning node ran correctly # Check if there are pending steps in the plan if current_index < len(plan): logger.info( - f"Plan has pending steps (current index {current_index}/{len(plan)}). Routing to Research Execution.") + f"Plan has pending steps (current index {current_index}/{len(plan)}). Routing to Research Execution." + ) return "execute_research" else: logger.info("All plan steps processed. Routing to Synthesis.") @@ -758,8 +878,14 @@ def should_continue(state: DeepResearchState) -> str: # --- DeepSearchAgent Class --- + class DeepResearchAgent: - def __init__(self, llm: Any, browser_config: Dict[str, Any], mcp_server_config: Optional[Dict[str, Any]] = None): + def __init__( + self, + llm: Any, + browser_config: Dict[str, Any], + mcp_server_config: Optional[Dict[str, Any]] = None, + ): """ Initializes the DeepSearchAgent. @@ -779,16 +905,21 @@ class DeepResearchAgent: self.stop_event: Optional[threading.Event] = None self.runner: Optional[asyncio.Task] = None # To hold the asyncio task for run - async def _setup_tools(self, task_id: str, stop_event: threading.Event, max_parallel_browsers: int = 1) -> List[ - Tool]: + async def _setup_tools( + self, task_id: str, stop_event: threading.Event, max_parallel_browsers: int = 1 + ) -> List[Tool]: """Sets up the basic tools (File I/O) and optional MCP tools.""" - tools = [WriteFileTool(), ReadFileTool(), ListDirectoryTool()] # Basic file operations + tools = [ + WriteFileTool(), + ReadFileTool(), + ListDirectoryTool(), + ] # Basic file operations browser_use_tool = create_browser_search_tool( llm=self.llm, browser_config=self.browser_config, task_id=task_id, stop_event=stop_event, - max_parallel_browsers=max_parallel_browsers + max_parallel_browsers=max_parallel_browsers, ) tools += [browser_use_tool] # Add MCP tools if config is provided @@ -796,14 +927,18 @@ class DeepResearchAgent: try: logger.info("Setting up MCP client and tools...") if not self.mcp_client: - self.mcp_client = await setup_mcp_client_and_tools(self.mcp_server_config) + self.mcp_client = await setup_mcp_client_and_tools( + self.mcp_server_config + ) mcp_tools = self.mcp_client.get_tools() logger.info(f"Loaded {len(mcp_tools)} MCP tools.") tools.extend(mcp_tools) except Exception as e: logger.error(f"Failed to set up MCP tools: {e}", exc_info=True) elif self.mcp_server_config: - logger.warning("MCP server config provided, but setup function unavailable.") + logger.warning( + "MCP server config provided, but setup function unavailable." + ) tools_map = {tool.name: tool for tool in tools} return tools_map.values() @@ -820,12 +955,16 @@ class DeepResearchAgent: workflow.add_node("plan_research", planning_node) workflow.add_node("execute_research", research_execution_node) workflow.add_node("synthesize_report", synthesis_node) - workflow.add_node("end_run", lambda state: logger.info("--- Reached End Run Node ---") or {}) # Simple end node + workflow.add_node( + "end_run", lambda state: logger.info("--- Reached End Run Node ---") or {} + ) # Simple end node # Define edges workflow.set_entry_point("plan_research") - workflow.add_edge("plan_research", "execute_research") # Always execute after planning + workflow.add_edge( + "plan_research", "execute_research" + ) # Always execute after planning # Conditional edge after execution workflow.add_conditional_edges( @@ -834,8 +973,8 @@ class DeepResearchAgent: { "execute_research": "execute_research", # Loop back if more steps "synthesize_report": "synthesize_report", # Move to synthesis if done - "end_run": "end_run" # End if stop requested or error - } + "end_run": "end_run", # End if stop requested or error + }, ) workflow.add_edge("synthesize_report", "end_run") # End after synthesis @@ -843,9 +982,13 @@ class DeepResearchAgent: app = workflow.compile() return app - async def run(self, topic: str, task_id: Optional[str] = None, save_dir: str = "./tmp/deep_research", - max_parallel_browsers: int = 1) -> Dict[ - str, Any]: + async def run( + self, + topic: str, + task_id: Optional[str] = None, + save_dir: str = "./tmp/deep_research", + max_parallel_browsers: int = 1, + ) -> Dict[str, Any]: """ Starts the deep research process (Async Generator Version). @@ -857,20 +1000,30 @@ class DeepResearchAgent: Intermediate state updates or messages during execution. """ if self.runner and not self.runner.done(): - logger.warning("Agent is already running. Please stop the current task first.") + logger.warning( + "Agent is already running. Please stop the current task first." + ) # Return an error status instead of yielding - return {"status": "error", "message": "Agent already running.", "task_id": self.current_task_id} + return { + "status": "error", + "message": "Agent already running.", + "task_id": self.current_task_id, + } self.current_task_id = task_id if task_id else str(uuid.uuid4()) output_dir = os.path.join(save_dir, self.current_task_id) os.makedirs(output_dir, exist_ok=True) - logger.info(f"[AsyncGen] Starting research task ID: {self.current_task_id} for topic: '{topic}'") + logger.info( + f"[AsyncGen] Starting research task ID: {self.current_task_id} for topic: '{topic}'" + ) logger.info(f"[AsyncGen] Output directory: {output_dir}") self.stop_event = threading.Event() _AGENT_STOP_FLAGS[self.current_task_id] = self.stop_event - agent_tools = await self._setup_tools(self.current_task_id, self.stop_event, max_parallel_browsers) + agent_tools = await self._setup_tools( + self.current_task_id, self.stop_event, max_parallel_browsers + ) initial_state: DeepResearchState = { "task_id": self.current_task_id, "topic": topic, @@ -894,11 +1047,15 @@ class DeepResearchAgent: initial_state.update(loaded_state) if loaded_state.get("research_plan"): logger.info( - f"Resuming with {len(loaded_state['research_plan'])} plan steps and {len(loaded_state.get('search_results', []))} existing results.") - initial_state[ - "topic"] = topic # Allow overriding topic even when resuming? Or use stored topic? Let's use new one. + f"Resuming with {len(loaded_state['research_plan'])} plan steps and {len(loaded_state.get('search_results', []))} existing results." + ) + initial_state["topic"] = ( + topic # Allow overriding topic even when resuming? Or use stored topic? Let's use new one. + ) else: - logger.warning(f"Resume requested for {task_id}, but no previous plan found. Starting fresh.") + logger.warning( + f"Resume requested for {task_id}, but no previous plan found. Starting fresh." + ) initial_state["current_step_index"] = 0 # --- Execute Graph using ainvoke --- @@ -955,17 +1112,22 @@ class DeepResearchAgent: "status": status, "message": message, "task_id": task_id_to_clean, # Use the stored task_id - "final_state": final_state if final_state else {} # Return the final state dict + "final_state": final_state + if final_state + else {}, # Return the final state dict } async def _stop_lingering_browsers(self, task_id): """Attempts to stop any BrowserUseAgent instances associated with the task_id.""" - keys_to_stop = [key for key in _BROWSER_AGENT_INSTANCES if key.startswith(f"{task_id}_")] + keys_to_stop = [ + key for key in _BROWSER_AGENT_INSTANCES if key.startswith(f"{task_id}_") + ] if not keys_to_stop: return logger.warning( - f"Found {len(keys_to_stop)} potentially lingering browser agents for task {task_id}. Attempting stop...") + f"Found {len(keys_to_stop)} potentially lingering browser agents for task {task_id}. Attempting stop..." + ) for key in keys_to_stop: agent_instance = _BROWSER_AGENT_INSTANCES.get(key) try: @@ -974,7 +1136,9 @@ class DeepResearchAgent: await agent_instance.stop() logger.info(f"Called stop() on browser agent instance {key}") except Exception as e: - logger.error(f"Error calling stop() on browser agent instance {key}: {e}") + logger.error( + f"Error calling stop() on browser agent instance {key}: {e}" + ) async def stop(self): """Signals the currently running agent task to stop.""" diff --git a/src/webui/components/browser_use_agent_tab.py b/src/webui/components/browser_use_agent_tab.py index 25f56bf..1657086 100644 --- a/src/webui/components/browser_use_agent_tab.py +++ b/src/webui/components/browser_use_agent_tab.py @@ -1,61 +1,53 @@ -import pdb +import asyncio +import json +import logging +import os +import uuid +from typing import Any, AsyncGenerator, Dict, Optional import gradio as gr -from gradio.components import Component -import asyncio -import os -import json -import uuid -import logging -from datetime import datetime -from typing import List, Dict, Optional, Any, Set, Generator, AsyncGenerator, Union -from collections.abc import Awaitable -from langchain_core.language_models.chat_models import BaseChatModel -import base64 -from browser_use.browser.browser import Browser, BrowserConfig -from browser_use.browser.context import BrowserContext, BrowserContextConfig, BrowserContextWindowSize + # from browser_use.agent.service import Agent -from browser_use.agent.views import AgentHistoryList -from browser_use.agent.views import ToolCallingMethod # Adjust import from browser_use.agent.views import ( - REQUIRED_LLM_API_ENV_VARS, - ActionResult, - AgentError, - AgentHistory, AgentHistoryList, AgentOutput, - AgentSettings, - AgentState, - AgentStepInfo, - StepMetadata, - ToolCallingMethod, ) -from browser_use.browser.browser import Browser -from browser_use.browser.context import BrowserContext -from browser_use.browser.views import BrowserState, BrowserStateHistory +from browser_use.browser.browser import BrowserConfig +from browser_use.browser.context import BrowserContext, BrowserContextWindowSize +from browser_use.browser.views import BrowserState +from gradio.components import Component +from langchain_core.language_models.chat_models import BaseChatModel -from src.webui.webui_manager import WebuiManager +from src.agent.browser_use.browser_use_agent import BrowserUseAgent +from src.browser.custom_browser import CustomBrowser +from src.browser.custom_context import CustomBrowserContextConfig from src.controller.custom_controller import CustomController from src.utils import llm_provider -from src.browser.custom_browser import CustomBrowser -from src.browser.custom_context import CustomBrowserContext, CustomBrowserContextConfig -from src.agent.browser_use.browser_use_agent import BrowserUseAgent +from src.webui.webui_manager import WebuiManager logger = logging.getLogger(__name__) # --- Helper Functions --- (Defined at module level) -async def _initialize_llm(provider: Optional[str], model_name: Optional[str], temperature: float, - base_url: Optional[str], api_key: Optional[str], num_ctx: Optional[int] = None) -> Optional[ - BaseChatModel]: + +async def _initialize_llm( + provider: Optional[str], + model_name: Optional[str], + temperature: float, + base_url: Optional[str], + api_key: Optional[str], + num_ctx: Optional[int] = None, +) -> Optional[BaseChatModel]: """Initializes the LLM based on settings. Returns None if provider/model is missing.""" if not provider or not model_name: logger.info("LLM Provider or Model Name not specified, LLM will be None.") return None try: # Use your actual LLM provider logic here - logger.info(f"Initializing LLM: Provider={provider}, Model={model_name}, Temp={temperature}") + logger.info( + f"Initializing LLM: Provider={provider}, Model={model_name}, Temp={temperature}" + ) # Example using a placeholder function llm = llm_provider.get_llm_model( provider=provider, @@ -64,18 +56,23 @@ async def _initialize_llm(provider: Optional[str], model_name: Optional[str], te base_url=base_url or None, api_key=api_key or None, # Add other relevant params like num_ctx for ollama - num_ctx=num_ctx if provider == "ollama" else None + num_ctx=num_ctx if provider == "ollama" else None, ) return llm except Exception as e: logger.error(f"Failed to initialize LLM: {e}", exc_info=True) gr.Warning( - f"Failed to initialize LLM '{model_name}' for provider '{provider}'. Please check settings. Error: {e}") + f"Failed to initialize LLM '{model_name}' for provider '{provider}'. Please check settings. Error: {e}" + ) return None -def _get_config_value(webui_manager: WebuiManager, comp_dict: Dict[gr.components.Component, Any], comp_id_suffix: str, - default: Any = None) -> Any: +def _get_config_value( + webui_manager: WebuiManager, + comp_dict: Dict[gr.components.Component, Any], + comp_id_suffix: str, + default: Any = None, +) -> Any: """Safely get value from component dictionary using its ID suffix relative to the tab.""" # Assumes component ID format is "tab_name.comp_name" tab_name = "browser_use_agent" # Hardcode or derive if needed @@ -93,7 +90,9 @@ def _get_config_value(webui_manager: WebuiManager, comp_dict: Dict[gr.components return comp_dict.get(comp, default) except KeyError: continue - logger.warning(f"Component with suffix '{comp_id_suffix}' not found in manager for value lookup.") + logger.warning( + f"Component with suffix '{comp_id_suffix}' not found in manager for value lookup." + ) return default @@ -103,12 +102,14 @@ def _format_agent_output(model_output: AgentOutput) -> str: if model_output: try: # Directly use model_dump if actions and current_state are Pydantic models - action_dump = [action.model_dump(exclude_none=True) for action in model_output.action] + action_dump = [ + action.model_dump(exclude_none=True) for action in model_output.action + ] state_dump = model_output.current_state.model_dump(exclude_none=True) model_output_dump = { - 'current_state': state_dump, - 'action': action_dump, + "current_state": state_dump, + "action": action_dump, } # Dump to JSON string with indentation json_string = json.dumps(model_output_dump, indent=4, ensure_ascii=False) @@ -117,7 +118,8 @@ def _format_agent_output(model_output: AgentOutput) -> str: except AttributeError as ae: logger.error( - f"AttributeError during model dump: {ae}. Check if 'action' or 'current_state' or their items support 'model_dump'.") + f"AttributeError during model dump: {ae}. Check if 'action' or 'current_state' or their items support 'model_dump'." + ) content = f"
Error: Could not format agent output (AttributeError: {ae}).\nRaw output: {str(model_output)}"
except Exception as e:
logger.error(f"Error formatting agent output: {e}", exc_info=True)
@@ -129,12 +131,17 @@ def _format_agent_output(model_output: AgentOutput) -> str:
# --- Updated Callback Implementation ---
-async def _handle_new_step(webui_manager: WebuiManager, state: BrowserState, output: AgentOutput, step_num: int):
+
+async def _handle_new_step(
+ webui_manager: WebuiManager, state: BrowserState, output: AgentOutput, step_num: int
+):
"""Callback for each step taken by the agent, including screenshot display."""
# Use the correct chat history attribute name from the user's code
- if not hasattr(webui_manager, 'bu_chat_history'):
- logger.error("Attribute 'bu_chat_history' not found in webui_manager! Cannot add chat message.")
+ if not hasattr(webui_manager, "bu_chat_history"):
+ logger.error(
+ "Attribute 'bu_chat_history' not found in webui_manager! Cannot add chat message."
+ )
# Initialize it maybe? Or raise an error? For now, log and potentially skip chat update.
webui_manager.bu_chat_history = [] # Initialize if missing (consider if this is the right place)
# return # Or stop if this is critical
@@ -145,21 +152,29 @@ async def _handle_new_step(webui_manager: WebuiManager, state: BrowserState, out
screenshot_html = ""
# Ensure state.screenshot exists and is not empty before proceeding
# Use getattr for safer access
- screenshot_data = getattr(state, 'screenshot', None)
+ screenshot_data = getattr(state, "screenshot", None)
if screenshot_data:
try:
# Basic validation: check if it looks like base64
- if isinstance(screenshot_data, str) and len(screenshot_data) > 100: # Arbitrary length check
+ if (
+ isinstance(screenshot_data, str) and len(screenshot_data) > 100
+ ): # Arbitrary length check
# *** UPDATED STYLE: Removed centering, adjusted width ***
img_tag = f'