fix deep research agent

This commit is contained in:
vvincent1234
2025-04-30 00:15:08 +08:00
parent dad8fc990a
commit 09e3f21e05
2 changed files with 216 additions and 98 deletions

View File

@@ -7,4 +7,4 @@ MainContentExtractor==0.0.4
langchain-ibm==0.3.10
langchain_mcp_adapters==0.0.9
langgraph==0.3.34
langchain-community==0.3.23
langchain-community

View File

@@ -2,6 +2,7 @@ import asyncio
import json
import logging
import os
import pdb
import uuid
from pathlib import Path
from typing import List, Dict, Any, TypedDict, Optional, Sequence, Annotated
@@ -16,6 +17,8 @@ from langchain.agents import AgentExecutor # We might use parts, but Langgraph
from langchain_community.tools.file_management import WriteFileTool, ReadFileTool, CopyFileTool, ListDirectoryTool, \
MoveFileTool, FileSearchTool
from langchain_openai import ChatOpenAI # Replace with your actual LLM import
from pydantic import BaseModel, Field
import operator
from browser_use.browser.browser import BrowserConfig
from browser_use.browser.context import BrowserContextWindowSize
@@ -37,7 +40,7 @@ os.makedirs(TMP_DIR, exist_ok=True)
REPORT_FILENAME = "report.md"
PLAN_FILENAME = "research_plan.md"
SEARCH_INFO_FILENAME = "search_info.json"
MAX_PARALLEL_BROWSERS = 2
MAX_PARALLEL_BROWSERS = 1
_AGENT_STOP_FLAGS = {}
_BROWSER_AGENT_INSTANCES = {} # To store running browser agents for stopping
@@ -175,41 +178,90 @@ async def run_single_browser_task(
logger.error(f"Error closing browser: {e}")
async def browser_search_tool_func(queries: List[str], task_id: str, llm: Any, browser_config: Dict[str, Any],
stop_event: threading.Event):
class BrowserSearchInput(BaseModel):
queries: List[str] = Field(
description=f"List of distinct search queries (max {MAX_PARALLEL_BROWSERS}) to find information relevant to the research task.")
async def _run_browser_search_tool(
queries: List[str],
task_id: str, # Injected dependency
llm: Any, # Injected dependency
browser_config: Dict[str, Any], # Injected dependency
stop_event: threading.Event # Injected dependency
) -> List[Dict[str, Any]]:
"""
Tool function to run multiple browser searches in parallel (up to MAX_PARALLEL_BROWSERS).
Internal function to execute parallel browser searches based on LLM-provided queries.
Handles concurrency and stop signals.
"""
if not BrowserUseAgent:
return [{"query": q, "error": "BrowserUseAgent components not available."} for q in queries]
# Limit queries just in case LLM ignores the description
queries = queries[:MAX_PARALLEL_BROWSERS]
logger.info(f"[Browser Tool {task_id}] Running search for {len(queries)} queries: {queries}")
results = []
# Use asyncio.Semaphore to limit concurrent browser instances
semaphore = asyncio.Semaphore(MAX_PARALLEL_BROWSERS)
async def task_wrapper(query):
async with semaphore:
if stop_event.is_set():
logger.info(f"Skipping browser task due to stop signal: {query}")
logger.info(f"[Browser Tool {task_id}] Skipping task due to stop signal: {query}")
return {"query": query, "result": None, "status": "cancelled"}
# Pass necessary configs and the stop event
return await run_single_browser_task(query, task_id, llm, browser_config, stop_event)
# Pass necessary injected configs and the stop event
return await run_single_browser_task(
query,
task_id,
llm, # Pass the main LLM (or a dedicated one if needed)
browser_config,
stop_event
# use_vision could be added here if needed
)
tasks = [task_wrapper(query) for query in queries]
# Use asyncio.gather to run tasks concurrently
search_results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results, handling potential exceptions returned by gather
for result in search_results:
if isinstance(result, Exception):
# Log the exception, but maybe return a specific error structure
logger.error(f"Browser task gather caught exception: {result}")
# Find which query failed if possible (difficult with gather exceptions directly)
results.append({"query": "unknown", "error": str(result), "status": "failed"})
processed_results = []
for i, res in enumerate(search_results):
query = queries[i] # Get corresponding query
if isinstance(res, Exception):
logger.error(f"[Browser Tool {task_id}] Gather caught exception for query '{query}': {res}", exc_info=True)
processed_results.append({"query": query, "error": str(res), "status": "failed"})
elif isinstance(res, dict):
processed_results.append(res)
else:
results.append(result)
logger.error(f"[Browser Tool {task_id}] Unexpected result type for query '{query}': {type(res)}")
processed_results.append({"query": query, "error": "Unexpected result type", "status": "failed"})
return results
logger.info(f"[Browser Tool {task_id}] Finished search. Results count: {len(processed_results)}")
return processed_results
def create_browser_search_tool(
llm: Any,
browser_config: Dict[str, Any],
task_id: str,
stop_event: threading.Event
) -> StructuredTool:
"""Factory function to create the browser search tool with necessary dependencies."""
# Use partial to bind the dependencies that aren't part of the LLM call arguments
from functools import partial
bound_tool_func = partial(
_run_browser_search_tool,
task_id=task_id,
llm=llm,
browser_config=browser_config,
stop_event=stop_event,
)
return StructuredTool.from_function(
coroutine=bound_tool_func,
name="parallel_browser_search",
description=f"""Use this tool to actively search the web for information related to a specific research task or question.
It runs up to {MAX_PARALLEL_BROWSERS} searches in parallel using a browser agent for better results than simple scraping.
Provide a list of distinct search queries that are likely to yield relevant information.
The tool returns a list of results, each containing the original query, the status (completed, failed, stopped), and the summarized information found (or an error message).""",
args_schema=BrowserSearchInput,
)
# --- Langgraph State Definition ---
@@ -238,6 +290,8 @@ class DeepResearchState(TypedDict):
# Add other state variables as needed
error_message: Optional[str] # To store errors
messages: List[BaseMessage]
# --- Langgraph Nodes ---
@@ -398,23 +452,27 @@ async def planning_node(state: DeepResearchState) -> Dict[str, Any]:
async def research_execution_node(state: DeepResearchState) -> Dict[str, Any]:
"""Executes the next step in the research plan using the browser tool."""
"""
Executes the next step in the research plan by invoking the LLM with tools.
The LLM decides which tool (e.g., browser search) to use and provides arguments.
"""
logger.info("--- Entering Research Execution Node ---")
if state.get('stop_requested'):
logger.info("Stop requested, skipping research execution.")
return {"stop_requested": True}
return {"stop_requested": True, "current_step_index": state['current_step_index']} # Keep index same
plan = state['research_plan']
current_index = state['current_step_index']
llm = state['llm']
browser_config = state['browser_config']
output_dir = state['output_dir']
tools = state['tools'] # Tools are now passed in state
output_dir = str(state['output_dir'])
task_id = state['task_id']
stop_event = _AGENT_STOP_FLAGS.get(task_id)
# Stop event is bound inside the tool function, no need to pass directly here
if not plan or current_index >= len(plan):
logger.info("Research plan complete or empty.")
return {} # Signal to move to synthesis or end
# This condition should ideally be caught by `should_continue` before reaching here
return {}
current_step = plan[current_index]
if current_step['status'] == 'completed':
@@ -423,93 +481,145 @@ async def research_execution_node(state: DeepResearchState) -> Dict[str, Any]:
logger.info(f"Executing research step {current_step['step']}: {current_step['task']}")
# 1. Generate Search Queries for the current task using LLM
query_gen_prompt = ChatPromptTemplate.from_messages([
("system",
f"You are an expert search query formulator. Given a research task, generate {MAX_PARALLEL_BROWSERS} distinct, effective search engine queries to find relevant information. Focus on diversity and different angles of the task. Output ONLY the queries, each on a new line."),
("human", f"Research Task: {current_step['task']}\n\nGenerate search queries:")
])
# Bind tools to the LLM for this call
llm_with_tools = llm.bind_tools(tools)
if state['messages']:
current_task_message = [HumanMessage(
content=f"Research Task (Step {current_step['step']}): {current_step['task']}")]
invocation_messages = state['messages'] + current_task_message
else:
current_task_message = [
SystemMessage(
content="You are a research assistant executing one step of a research plan. Use the available tools, especially the 'parallel_browser_search' tool, to gather information needed for the current task. Be precise with your search queries if using the browser tool."),
HumanMessage(
content=f"Research Task (Step {current_step['step']}): {current_step['task']}")
]
invocation_messages = current_task_message
try:
response = await llm.ainvoke(query_gen_prompt.format_prompt().to_messages())
queries = [q.strip() for q in response.content.strip().split('\n') if q.strip()]
if not queries:
# Invoke the LLM, expecting it to make a tool call
logger.info(f"Invoking LLM with tools for task: {current_step['task']}")
ai_response: BaseMessage = await llm_with_tools.ainvoke(invocation_messages)
logger.info("LLM invocation complete.")
tool_results = []
executed_tool_names = []
if not isinstance(ai_response, AIMessage) or not ai_response.tool_calls:
# LLM didn't call a tool. Maybe it answered directly? Or failed?
logger.warning(
f"LLM did not generate any search queries for task: {current_step['task']}. Using task itself as query.")
queries = [current_step['task']]
else:
queries = queries[:MAX_PARALLEL_BROWSERS] # Limit to max parallel
logger.info(f"Generated queries: {queries}")
current_step['queries'] = queries # Store generated queries in the plan item
except Exception as e:
logger.error(f"Failed to generate search queries: {e}. Using task as query.", exc_info=True)
queries = [current_step['task']]
current_step['queries'] = queries
# 2. Execute Searches using the Browser Tool
try:
search_results_list = await browser_search_tool_func(
queries=queries,
task_id=task_id,
llm=llm,
browser_config=browser_config,
stop_event=stop_event
)
# Check for stop signal *after* search execution attempt
if stop_event and stop_event.is_set():
logger.info("Stop requested during or after search execution.")
# Update plan partially if needed, or just signal stop
current_step['status'] = 'pending' # Mark as not completed due to stop
f"LLM did not call any tool for step {current_step['step']}. Response: {ai_response.content[:100]}...")
# How to handle this? Mark step as failed? Or store the content?
# Let's mark as failed for now, assuming a tool was expected.
current_step['status'] = 'failed'
current_step['result_summary'] = "LLM did not use a tool as expected."
_save_plan_to_md(plan, output_dir)
# Save any partial results gathered before stop
current_search_results = state.get('search_results', [])
current_search_results.extend([r for r in search_results_list if r.get('status') != 'cancelled'])
_save_search_results_to_json(current_search_results, output_dir)
return {"stop_requested": True, "search_results": current_search_results, "research_plan": plan}
return {
"research_plan": plan,
"current_step_index": current_index + 1,
"error_message": f"LLM failed to call a tool for step {current_step['step']}."
}
# 3. Process Results and Update State
successful_results = [r for r in search_results_list if r.get('status') == 'completed' and r.get('result')]
failed_queries = [r['query'] for r in search_results_list if r.get('status') == 'failed']
# Combine results with existing ones
all_search_results = state.get('search_results', [])
all_search_results.extend(search_results_list) # Add all results (incl. errors)
# Process tool calls
for tool_call in ai_response.tool_calls:
tool_name = tool_call.get("name")
tool_args = tool_call.get("args", {})
tool_call_id = tool_call.get("id") # Important for ToolMessage
if failed_queries:
logger.warning(f"Some queries failed: {failed_queries}")
# Optionally add logic to retry failed queries
logger.info(f"LLM requested tool call: {tool_name} with args: {tool_args}")
executed_tool_names.append(tool_name)
if successful_results:
# Optionally, summarize the findings for this step (could be another LLM call)
# current_step['result_summary'] = "Summary of findings..."
current_step['status'] = 'completed'
logger.info(f"Step {current_step['step']} completed successfully.")
# Find the corresponding tool instance
selected_tool = next((t for t in tools if t.name == tool_name), None)
if not selected_tool:
logger.error(f"LLM called tool '{tool_name}' which is not available.")
# Create a ToolMessage indicating the error
tool_results.append(ToolMessage(
content=f"Error: Tool '{tool_name}' not found.",
tool_call_id=tool_call_id
))
continue # Skip to next tool call if any
# Execute the tool
try:
# Stop check before executing the tool (tool itself also checks)
stop_event = _AGENT_STOP_FLAGS.get(task_id)
if stop_event and stop_event.is_set():
logger.info(f"Stop requested before executing tool: {tool_name}")
# How to report this back? Maybe skip execution, return special state?
# Let's update state and return stop_requested = True
current_step['status'] = 'pending' # Not completed due to stop
_save_plan_to_md(plan, output_dir)
return {"stop_requested": True, "research_plan": plan}
logger.info(f"Executing tool: {tool_name}")
# Assuming tool functions handle async correctly
tool_output = await selected_tool.ainvoke(tool_args)
logger.info(f"Tool '{tool_name}' executed successfully.")
browser_tool_called = "parallel_browser_search" in executed_tool_names
# Append result to overall search results
current_search_results = state.get('search_results', [])
if browser_tool_called: # Specific handling for browser tool output
current_search_results.extend(tool_output)
else: # Handle other tool outputs (e.g., file tools return strings)
# Store it associated with the step? Or a generic log?
# Let's just log it for now. Need better handling for diverse tool outputs.
logger.info(f"Result from tool '{tool_name}': {str(tool_output)[:200]}...")
# Store result for potential next LLM call (if we were doing multi-turn)
tool_results.append(ToolMessage(
content=json.dumps(tool_output),
tool_call_id=tool_call_id
))
except Exception as e:
logger.error(f"Error executing tool '{tool_name}': {e}", exc_info=True)
tool_results.append(ToolMessage(
content=f"Error executing tool {tool_name}: {e}",
tool_call_id=tool_call_id
))
# Also update overall state search_results with error?
current_search_results = state.get('search_results', [])
current_search_results.append(
{"tool_name": tool_name, "args": tool_args, "status": "failed", "error": str(e)})
# Basic check: Did the browser tool run at all? (More specific checks needed)
browser_tool_called = "parallel_browser_search" in executed_tool_names
# We might need a more nuanced status based on the *content* of tool_results
step_failed = any("Error:" in str(tr.content) for tr in tool_results) or not browser_tool_called
if step_failed:
logger.warning(f"Step {current_step['step']} failed or did not yield results via browser search.")
current_step['status'] = 'failed'
current_step[
'result_summary'] = f"Tool execution failed or browser tool not used. Errors: {[tr.content for tr in tool_results if 'Error' in str(tr.content)]}"
else:
# Decide how to handle steps with no successful results
logger.warning(f"Step {current_step['step']} completed but yielded no successful results.")
current_step['status'] = 'failed' # Or 'completed_no_results'
logger.info(f"Step {current_step['step']} completed using tool(s): {executed_tool_names}.")
current_step['status'] = 'completed'
current_step['result_summary'] = f"Executed tool(s): {', '.join(executed_tool_names)}."
# Update the plan file on disk
_save_plan_to_md(plan, output_dir)
# Update the search results file on disk
_save_search_results_to_json(all_search_results, output_dir)
_save_search_results_to_json(current_search_results, output_dir)
return {
"research_plan": plan,
"search_results": all_search_results,
"search_results": current_search_results, # Update with new results
"current_step_index": current_index + 1,
"error_message": None if not failed_queries else f"Failed queries: {failed_queries}"
"messages": state["messages"] + current_task_message + [ai_response] + tool_results,
# Optionally return the tool_results messages if needed by downstream nodes
}
except Exception as e:
logger.error(f"Error during research execution for step {current_step['step']}: {e}", exc_info=True)
logger.error(f"Unhandled error during research execution node for step {current_step['step']}: {e}",
exc_info=True)
current_step['status'] = 'failed'
_save_plan_to_md(plan, output_dir)
return {
"research_plan": plan,
"current_step_index": current_index + 1, # Move to next step even if failed? Or retry? Let's move on.
"error_message": f"Execution Error on step {current_step['step']}: {e}"
"current_step_index": current_index + 1, # Move on even if error?
"error_message": f"Core Execution Error on step {current_step['step']}: {e}"
}
@@ -668,15 +778,22 @@ class DeepSearchAgent:
self.stop_event: Optional[threading.Event] = None
self.runner: Optional[asyncio.Task] = None # To hold the asyncio task for run
async def _setup_tools(self) -> List[Tool]:
async def _setup_tools(self, task_id: str, stop_event: threading.Event) -> List[Tool]:
"""Sets up the basic tools (File I/O) and optional MCP tools."""
tools = [WriteFileTool(), ReadFileTool(), ListDirectoryTool(), CopyFileTool(),
MoveFileTool()] # Basic file operations
tools = [WriteFileTool(), ReadFileTool(), ListDirectoryTool()] # Basic file operations
browser_use_tool = create_browser_search_tool(
llm=self.llm,
browser_config=self.browser_config,
task_id=task_id,
stop_event=stop_event
)
tools += [browser_use_tool]
# Add MCP tools if config is provided
if self.mcp_server_config:
try:
logger.info("Setting up MCP client and tools...")
if self.mcp_client:
await self.mcp_client.__aexit__(None, None, None)
self.mcp_client = await setup_mcp_client_and_tools(self.mcp_server_config)
mcp_tools = self.mcp_client.get_tools()
logger.info(f"Loaded {len(mcp_tools)} MCP tools.")
@@ -744,12 +861,13 @@ class DeepSearchAgent:
self.stop_event = threading.Event()
_AGENT_STOP_FLAGS[self.current_task_id] = self.stop_event
agent_tools = await self._setup_tools()
agent_tools = await self._setup_tools(self.current_task_id, self.stop_event)
initial_state: DeepResearchState = {
"task_id": self.current_task_id,
"topic": topic,
"research_plan": [],
"search_results": [],
"messages": [],
"llm": self.llm,
"tools": agent_tools,
"output_dir": output_dir,