更新多智能体

This commit is contained in:
yuruo
2025-03-13 20:37:30 +08:00
parent 070c4c52c8
commit 48b4c23508
5 changed files with 215 additions and 33 deletions

View File

@@ -1,3 +1,4 @@
from pydantic import BaseModel, Field
from gradio_ui.agent.base_agent import BaseAgent
from xbrain.core.chat import run
@@ -6,35 +7,88 @@ class TaskPlanAgent(BaseAgent):
self.output_callback = output_callback
def __call__(self, user_task: str):
self.output_callback("正在规划任务中...", sender="bot")
response = run([{"role": "user", "content": user_task}], user_prompt=system_prompt)
self.output_callback("Starting task planning...", sender="bot")
response = run([{"role": "user", "content": user_task}], user_prompt=system_prompt, response_format=TaskPlanResponse)
self.output_callback(response, sender="bot")
return response
class Plan(BaseModel):
expected_result: str = Field(description="操作后的预期状态")
error_handling: str = Field(description="操作失败时的替代方案")
action: str = Field(description="操作类型")
target_element: str = Field(description="操作目标元素")
class TaskPlanResponse(BaseModel):
task_plan: list[Plan] = Field(description="具体的操作步骤序列")
system_prompt = """
### 目标 ###
你是电脑任务规划专家,根据用户需求,规划出要执行的任务。
##########
你是自动化操作规划专家,根据屏幕内容和用户需求,规划精确可执行的操作序列。
### 输入 ###
用户需求,通常是一个文本描述。
##########
### 输出 ###
一系列任务,包括任务名称
##########
1. 用户需求:文本描述形式的任务目标
2. 当前环境:屏幕上可见的元素和状态
### 输出格式 ###
操作序列应采用以下JSON格式
[
{
"操作类型": "点击/输入/拖拽/等待/判断...",
"目标元素": "元素描述或坐标",
"参数": "具体参数,如文本内容",
"预期结果": "操作后的预期状态",
"错误处理": "操作失败时的替代方案"
},
]
### 操作类型说明 ###
- 左键点击:在特定元素或坐标上执行点击
- 右键点击:在特定元素或坐标上执行右键点击
- 输入:在输入框中输入文本
- 等待:等待特定元素出现或状态变化
- 滚动:上下或左右滚动屏幕
### 例子 ###
案例1
输入获取AI新闻
输出:
1. 打开浏览器
2. 打开百度首页
3. 搜索“AI”相关内容
4. 浏览搜索结果,记录搜索结果
5. 返回搜索内容
案例2
输入删除桌面的txt文件
输出:
1. 进入桌面
2. 寻找所有txt文件
3. 右键txt文件选择删除
[
{
"操作类型": "点击",
"目标元素": "浏览器图标",
"参数": "",
"预期结果": "浏览器打开",
"错误处理": "如未找到浏览器图标,尝试通过开始菜单搜索浏览器"
},
{
"操作类型": "输入",
"目标元素": "地址栏",
"参数": "https://www.baidu.com",
"预期结果": "百度首页加载完成",
"错误处理": "如连接失败,重试或尝试其他搜索引擎"
},
{
"操作类型": "输入",
"目标元素": "搜索框",
"参数": "AI最新新闻",
"预期结果": "搜索框填充完成",
"错误处理": "如搜索框不可用,尝试刷新页面"
},
{
"操作类型": "点击",
"目标元素": "搜索按钮",
"参数": "",
"预期结果": "显示搜索结果页",
"错误处理": "如点击无反应,尝试按回车键"
},
{
"操作类型": "判断",
"目标元素": "搜索结果列表",
"参数": "包含AI相关内容",
"预期结果": "找到相关新闻",
"错误处理": "如无相关结果,尝试修改搜索关键词"
}
]
"""

View File

@@ -16,11 +16,9 @@ class TaskRunAgent(BaseAgent):
def __call__(self, task_plan, parsed_screen):
screen_info = str(parsed_screen['parsed_content_list'])
self.SYSTEM_PROMPT = system_prompt.format(task_plan=task_plan,
self.SYSTEM_PROMPT = system_prompt.format(task_plan=str(task_plan),
device=self.get_device(),
screen_info=screen_info)
screen_width, screen_height = parsed_screen['width'], parsed_screen['height']
img_to_show = parsed_screen["image"]
buffered = BytesIO()
img_to_show.save(buffered, format="PNG")

View File

@@ -0,0 +1,81 @@
from anthropic import BaseModel
from pydantic import Field
from gradio_ui.agent.base_agent import BaseAgent
from xbrain.core.chat import run
class VerificationAgent(BaseAgent):
def __init__(self, output_callback):
self.output_callback = output_callback
def __call__(self, expected_result):
response = run([{"role": "user", "content": expected_result}], user_prompt=prompt, response_format=VerificationResponse)
self.output_callback(response, sender="bot")
return response
class VerificationResponse(BaseModel):
verification_status: str = Field(description="验证状态", json_schema_extra={"enum": ["success", "error"]})
verification_method: str = Field(description="验证方法")
evidence: str = Field(description="证据")
confidence: int = Field(description="置信度")
failure_reason: str = Field(description="失败原因")
remedy_measures: list[str] = Field(description="补救措施")
prompt = """
### 目标 ###
你是自动化验证专家,负责确认每个操作后的预期结果是否达成,保证自动化流程可靠执行。
### 输入 ###
1. 操作信息:刚执行的操作类型和参数
2. 屏幕状态:当前屏幕上的视觉元素和状态
3. 预期结果:操作应该产生的效果
### 输出格式 ###
验证结果应采用以下JSON格式
{
"验证状态": "成功/失败",
"验证方法": "使用的验证方法",
"证据": "支持验证结果的具体证据",
"置信度": 0-100的数值,
"失败原因": "如果失败,分析可能的原因",
"补救措施": [
"建议的补救措施1",
"建议的补救措施2"
],
}
### 验证方法 ###
1. **视觉验证**识别特定UI元素是否出现或消失
- 元素存在性:检查某元素是否存在
- 元素状态:检查元素是否处于特定状态(激活、禁用等)
- 视觉变化:检查屏幕特定区域是否发生变化
2. **内容验证**:确认特定文本或数据是否正确
- 文本匹配:页面上是否包含预期文本
- 数据一致性:显示的数据是否符合预期
- 计数验证:元素数量是否符合预期
3. **系统状态验证**:检查系统响应
- 进程状态:特定进程是否运行
- 文件变化:文件是否被创建、修改或删除
- 网络活动:是否有特定网络请求或响应
### 验证策略 ###
- **重试机制**:指定最大重试次数和间隔时间
- **渐进式验证**:先验证基础条件,再验证详细条件
- **模糊匹配**:允许近似匹配而非精确匹配
- **超时设置**:指定验证的最长等待时间
### 例子 ###
操作:点击"登录"按钮
预期结果:登录成功并显示首页
验证输出:
{
"verification_status": "success",
"verification_method": "视觉验证+内容验证",
"evidence": "1. 检测到欢迎消息'你好,用户名' 2. 导航栏显示用户头像 3. URL已变更为首页地址",
"confidence": 95,
"failure_reason": "",
"remedy_measures": [],
}
"""

View File

@@ -192,8 +192,6 @@ def process_input(user_input, state, vision_agent_state):
messages=state["messages"],
output_callback=partial(chatbot_output_callback, chatbot_state=state['chatbot_messages'], hide_images=False),
tool_output_callback=partial(_tool_output_callback, tool_state=state["tools"]),
api_response_callback=partial(_api_response_callback, response_state=state["responses"]),
only_n_most_recent_images=state["only_n_most_recent_images"],
vision_agent = agent
):
if loop_msg is None or state.get("stop"):

View File

@@ -4,6 +4,7 @@ Agentic sampling loop that calls the Anthropic API and local implenmentation of
from collections.abc import Callable
from time import sleep
import cv2
from gradio_ui.agent.verification_agent import VerificationAgent
from gradio_ui.agent.vision_agent import VisionAgent
from gradio_ui.tools.screen_capture import get_screenshot
from anthropic import APIResponse
@@ -27,8 +28,6 @@ def sampling_loop_sync(
messages: list[BetaMessageParam],
output_callback: Callable[[BetaContentBlock], None],
tool_output_callback: Callable[[ToolResult, str], None],
api_response_callback: Callable[[APIResponse[BetaMessage]], None],
only_n_most_recent_images: int | None = 0,
vision_agent: VisionAgent
):
"""
@@ -41,11 +40,9 @@ def sampling_loop_sync(
tool_output_callback=tool_output_callback,
)
tool_result_content = None
plan = task_plan_agent(user_task = messages[-1]["content"][0].text)
plan_list = task_plan_agent(user_task = messages[-1]["content"][0].text)
task_run_agent = TaskRunAgent(output_callback=output_callback)
while True:
for plan in plan_list:
parsed_screen = parse_screen(vision_agent)
tools_use_needed, __ = task_run_agent(task_plan=plan, parsed_screen=parsed_screen)
sleep(2)
@@ -53,6 +50,8 @@ def sampling_loop_sync(
yield message
if not tool_result_content:
return messages
sampling_loop_with_recovery(model, messages, vision_agent)
def parse_screen(vision_agent: VisionAgent):
screenshot, screenshot_path = get_screenshot()
@@ -93,4 +92,56 @@ def draw_elements(screenshot, parsed_content_list):
opencv_image = cv2.cvtColor(opencv_image, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(opencv_image)
return pil_image
return pil_image
def sampling_loop_with_recovery(model, messages, vision_agent, max_retries=3):
retries = 0
while retries < max_retries:
# 执行原始操作
for message, tool_result_content in executor(tools_use_needed, messages):
yield message
if not tool_result_content:
return messages
# 验证结果
verification_result = verification_agent(plan["expected_result"])
# 如果验证成功,返回结果
if verification_result["verification_status"] == "success":
messages.append({
"role": "system",
"content": "验证成功:操作达到预期结果"
})
return messages
# 如果验证失败,执行补救措施
elif verification_result["verification_status"] == "error":
retries += 1
# 添加验证失败消息
messages.append({
"role": "system",
"content": f"验证失败(第{retries}次尝试):{verification_result.get('error_message', '未达到预期结果')}"
})
if retries >= max_retries:
messages.append({
"role": "system",
"content": "达到最大重试次数,操作失败。"
})
return messages
# 执行补救措施
recovery_plan = generate_recovery_plan(model, messages, verification_result)
messages.append({
"role": "system",
"content": f"正在执行补救措施:{recovery_plan['description']}"
})
# 执行补救操作
for recovery_message, recovery_result in executor(recovery_plan["recovery_actions"], messages):
yield recovery_message
# 继续循环,重新验证