更新区域查找,多agent逻辑优化

This commit is contained in:
yuruo 2025-03-15 19:50:59 +08:00
parent 2752ee22ac
commit a845586be8
9 changed files with 114 additions and 150 deletions

View File

@ -13,7 +13,11 @@ https://github.com/user-attachments/assets/35969270-873f-44d7-8b26-0944c76ba9f1
</div>
> Special Note: The autoMate project is still in a very early stage. Its current capabilities are limited and it's primarily for learning and communication purposes. However, we are continuously seeking breakthroughs and integrating the latest technologies!
> Special Note: The autoMate project is still in a very early stage. Its current capabilities are limited and it's primarily for learning and communication purposes. However, we are continuously seeking breakthroughs and integrating the latest technologies!If you have any question, add my wechat.
<div align="center">
<img src="./resources/wxchat.png" width="120" height="120" alt="autoMate logo">
</div>
## 💫 Redefining Your Relationship with Computers

View File

@ -11,7 +11,12 @@ https://github.com/user-attachments/assets/35969270-873f-44d7-8b26-0944c76ba9f1
</div>
> 特别声明autoMate 项目还处于非常早期阶段,目前的能力还不足以解决任何问题,当前仅限于学习和交流。不过我会不断的寻求突破点,不停地融入最新的技术!
> 特别声明autoMate 项目还处于非常早期阶段目前的能力还不足以解决任何问题当前仅限于学习和交流。不过我会不断的寻求突破点不停地融入最新的技术如果你有任何疑问也可以加vx好友入群交流。
<div align="center">
<img src="./resources/wxchat.png" width="120" height="120" alt="autoMate logo">
</div>
## 💫 重新定义你与电脑的关系
@ -34,6 +39,7 @@ autoMate 是一款革命性的AI+RPA自动化工具基于OmniParser构建
- 🚅 简化安装 - 比官方版本更简洁的安装流程,支持中文环境,一键部署
## 🚀 快速开始
### 📦 安装

View File

@ -1,4 +1,6 @@
| Vendor-en | Vendor-ch | Model | base-url |
| --- | --- | --- | --- |
| openainext | openainext | gpt-4o-2024-11-20 | https://api.openai-next.com/v1 |
| openainext | openainext | gpt-4.5-preview-2025-02-27 | https://api.openai-next.com/v1 |
| openainext international| openainext 国际| gpt-4o-2024-11-20 | https://api.openai-next.com/v1 |
| openainext international| openainext 国际| gpt-4.5-preview-2025-02-27 | https://api.openai-next.com/v1 |
| openainext china| openainext中国|gpt-4o-2024-11-20|https://cn.api.openai-next.com/v1
| openainext china| openainext中国|gpt-4.5-preview-2025-02-27|https://cn.api.openai-next.com/v1

View File

@ -3,9 +3,11 @@ from pydantic import BaseModel, Field
from gradio_ui.agent.base_agent import BaseAgent
from xbrain.core.chat import run
from gradio_ui.tools.computer import Action
class TaskPlanAgent(BaseAgent):
def __call__(self, messages, parsed_screen_result):
screen_info = str(parsed_screen_result['parsed_content_list'])
screen_info = str([{"box_id": i.element_id, "caption": i.caption, "text": i.text} for i in parsed_screen_result['parsed_content_list']])
messages[-1] = {"role": "user",
"content": [
{"type": "text", "text": messages[-1]["content"]},
@ -15,20 +17,14 @@ class TaskPlanAgent(BaseAgent):
}
]
}
response = run(messages, user_prompt=system_prompt.format(screen_info=screen_info), response_format=TaskPlanResponse)
response = run(messages, user_prompt=system_prompt.format(screen_info=screen_info, action_list=str(Action)), response_format=TaskPlanResponse)
print("task_plan_agent response: ", response)
return json.loads(response)
class Plan(BaseModel):
expected_result: str = Field(description="操作后的预期状态")
error_handling: str = Field(description="操作失败时的替代方案")
action: str = Field(description="操作类型")
target_element: str = Field(description="操作目标元素")
class TaskPlanResponse(BaseModel):
reasoning: str = Field(description="描述您规划任务的逻辑")
task_plan: list[Plan] = Field(description="具体的操作步骤序列")
task_list: list[str] = Field(description="任务列表")
system_prompt = """
@ -46,59 +42,34 @@ system_prompt = """
操作序列应采用以下JSON格式
[
{{
"操作类型": "点击/输入/拖拽/等待/判断...",
"目标元素": "元素描述或坐标",
"参数": "具体参数,如文本内容",
"预期结果": "操作后的预期状态",
"错误处理": "操作失败时的替代方案"
"reasoning": "描述您规划任务的逻辑",
"task_plan": ["任务1", "任务2", "任务3"]
}}
]
### 操作类型说明 ###
- 左键点击在特定元素或坐标上执行点击
- 右键点击在特定元素或坐标上执行右键点击
- 输入在输入框中输入文本
- 等待等待特定元素出现或状态变化
- 滚动上下或左右滚动屏幕
任务中的操作应该仅包含
{action_list}
### 限制 ###
- 不要说点击xx坐标这样用户无法理解应该说点击地址栏搜索框输入按钮等
### 例子 ###
输入获取AI新闻
输出
[
{{
"操作类型": "点击",
"目标元素": "浏览器图标",
"参数": "",
"预期结果": "浏览器打开",
"错误处理": "如未找到浏览器图标,尝试通过开始菜单搜索浏览器"
"reasoning": "看到有一个地址栏所以应该在地址栏输入https://www.baidu.com",
"task_plan": ["在地址栏输入https://www.baidu.com"]
}},
{{
"操作类型": "输入",
"目标元素": "地址栏",
"参数": "https://www.baidu.com",
"预期结果": "百度首页加载完成",
"错误处理": "如连接失败,重试或尝试其他搜索引擎"
"reasoning": "这是百度页面看到有一个搜索框所以应该在搜索框输入AI最新新闻",
"task_plan": ["在搜索框输入AI最新新闻"]
}},
{{
"操作类型": "输入",
"目标元素": "搜索框",
"参数": "AI最新新闻",
"预期结果": "搜索框填充完成",
"错误处理": "如搜索框不可用,尝试刷新页面"
}},
{{
"操作类型": "点击",
"目标元素": "搜索按钮",
"参数": "",
"预期结果": "显示搜索结果页",
"错误处理": "如点击无反应,尝试按回车键"
}},
{{
"操作类型": "判断",
"目标元素": "搜索结果列表",
"参数": "包含AI相关内容",
"预期结果": "找到相关新闻",
"错误处理": "如无相关结果,尝试修改搜索关键词"
"reasoning": "看到有一个搜索按钮,所以应该点击搜索按钮",
"task_plan": ["点击搜索按钮"]
}}
]
"""

View File

@ -1,4 +1,5 @@
from enum import Enum
import json
import uuid
from anthropic.types.beta import BetaMessage, BetaTextBlock, BetaToolUseBlock, BetaMessageParam, BetaUsage
@ -13,11 +14,7 @@ class TaskRunAgent(BaseAgent):
def __init__(self):
self.OUTPUT_DIR = "./tmp/outputs"
def __call__(self, task_plan, parsed_screen_result, messages):
screen_info = str(parsed_screen_result['parsed_content_list'])
self.SYSTEM_PROMPT = system_prompt.format(task_plan=str(task_plan),
device=self.get_device(),
screen_info=screen_info)
def __call__(self, parsed_screen_result, messages):
messages.append(
{"role": "user",
"content": [
@ -29,9 +26,14 @@ class TaskRunAgent(BaseAgent):
]
}
)
task_list = json.loads(messages[1]['content'])['task_list']
# Convert task_list to a numbered format
formatted_task_list = "\n".join([f"{i}.{task}" for i, task in enumerate(task_list)])
screen_info = str([{"box_id": i.element_id, "caption": i.caption, "text": i.text} for i in parsed_screen_result['parsed_content_list']])
system_prompt = prompt.format(screen_info=screen_info, task_list=formatted_task_list)
vlm_response = run(
messages,
user_prompt=self.SYSTEM_PROMPT,
user_prompt=system_prompt,
response_format=TaskRunAgentResponse
)
vlm_response_json = json.loads(vlm_response)
@ -65,29 +67,6 @@ class TaskRunAgent(BaseAgent):
return element
return None
def get_device(self):
# 获取当前操作系统信息
system = platform.system()
if system == "Windows":
device = f"Windows {platform.release()}"
elif system == "Darwin":
device = f"Mac OS {platform.mac_ver()[0]}"
elif system == "Linux":
device = f"Linux {platform.release()}"
else:
device = system
return device
def extract_data(self, input_string, data_type):
# Regular expression to extract content starting from '```python' until the end if there are no closing backticks
pattern = f"```{data_type}" + r"(.*?)(```|$)"
# Extract content
# re.DOTALL allows '.' to match newlines as well
matches = re.findall(pattern, input_string, re.DOTALL)
# Return the first match if exists, trimming whitespace and ignoring potential closing backticks
return matches[0][0].strip() if matches else input_string
class TaskRunAgentResponse(BaseModel):
reasoning: str = Field(description="描述当前屏幕上的内容,考虑历史记录,然后描述您如何实现任务的逐步思考,一次从可用操作中选择一个操作。")
next_action: str = Field(
@ -96,53 +75,55 @@ class TaskRunAgentResponse(BaseModel):
"enum": Action
}
)
box_id: int = Field(description="要操作的框ID当next_action为left_click、right_click、double_click、hover时提供否则为None", default=None)
value: str = Field(description="仅当next_action为type时提供否则为None", default=None)
box_id: int = Field(description="要操作的框ID当next_action为left_click、right_click、double_click、hover时提供否则为None")
value: str = Field(description="仅当next_action为type时提供否则为None")
current_task_id: int = Field(description="请判断一下你正在完成第几个任务第一个任务是0")
system_prompt = """
prompt = """
### 目标 ###
你是一个自动化规划师需要完成用户的任务请你根据屏幕信息确定下一步操作以完成任务
你当前的任务是
{task_plan}
以下是用yolo检测的当前屏幕上的所有元素图标左上角的数字为box_id
你是一个任务执行者需要执行之前assistant返回的任务列表请你根据屏幕信息确定next_action如果任务完成把next_action设置为None
以下是当前屏幕上的所有元素图标左上角的数字为box_id
{screen_info}
请根据以下任务列表判断一下你正在执行第几个任务current_task_id第一个任务是0任务列表如下
{task_list}
##########
### 注意 ###
1. 每次应该只给出一个操作
2. 应该对当前屏幕进行分析通过查看历史记录反思已完成的工作然后描述您如何实现任务的逐步思考
3. "Next Action"中附上下一步操作预测
4. 不应包括其他操作例如键盘快捷键
5. 当任务完成时不要完成额外的操作你应该在json字段中说"Next Action": "None"
6. 任务涉及购买多个产品或浏览多个页面你应该将其分解为子目标并按照说明的顺序一个一个地完成每个子目标
7. 避免连续多次选择相同的操作/元素如果发生这种情况反思自己可能出了什么问题并预测不同的操作
8. 如果您收到登录信息页面或验证码页面的提示或者您认为下一步操作需要用户许可您应该在json字段中说"Next Action": "None"
9. 你只能使用鼠标和键盘与计算机进行交互
10. 你只能与桌面图形用户界面交互无法访问终端或应用程序菜单
11. 如果当前屏幕没有显示任何可操作的元素并且当前屏幕不能下滑请返回None
- 每次应该只给出一个操作告诉我要对哪个box_id进行操作输入什么内容或者滚动或者其他操作
- 应该对当前屏幕进行分析通过查看历史记录反思已完成的工作然后描述您如何实现任务的逐步思考
- 避免连续多次选择相同的操作/元素如果发生这种情况反思自己可能出了什么问题并预测不同的操作
- 任务不是连续的上一次是1下一次不一定是2你要根据next_action进行判断
- current_task_id 要在任务列表中找到不要随便写
- 当你觉得任务已经完成时请一定把next_action设置为'None'不然会重复执行
##########
### 输出格式 ###
```json
{{
"reasoning": str, # 描述当前屏幕上的内容,考虑历史记录,然后描述您如何实现任务的逐步思考,一次从可用操作中选择一个操作
"next_action": "action_type, action description" | "None" # 一次一个操作,简短精确地描述它
"box_id": n,
"reasoning": str, # 综合当前屏幕上的内容和历史记录,描述您是如何思考的
"next_action": str, # 要执行的动作
"box_id": int, # 要操作的框ID当next_action为left_click、right_click、double_click、hover时提供否则为None
"value": "xxx" # 仅当操作为type时提供value字段否则不包括value键
"current_task_id": int # 当前正在执行第几个任务第一个任务是0
}}
```
##########
### 案例 ###
任务列表
0. 打开浏览器
1. 搜索亚马逊
2. 点击第一个搜索结果
一个例子
```json
{{
"reasoning": "当前屏幕显示亚马逊的谷歌搜索结果在之前的操作中我已经在谷歌上搜索了亚马逊。然后我需要点击第一个搜索结果以转到amazon.com。",
"next_action": "left_click",
"box_id": m
"box_id": 35,
"current_task_id": 0
}}
```
@ -151,8 +132,9 @@ system_prompt = """
{{
"reasoning": "当前屏幕显示亚马逊的首页。没有之前的操作。因此,我需要在搜索栏中输入"Apple watch"",
"next_action": "type",
"box_id": n,
"value": "Apple watch"
"box_id": 27,
"value": "Apple watch",
"current_task_id": 1
}}
```
@ -160,7 +142,8 @@ system_prompt = """
```json
{{
"reasoning": "当前屏幕没有显示'提交'按钮,我需要向下滚动以查看按钮是否可用。",
"next_action": "scroll_down"
"next_action": "scroll_down",
"current_task_id": 2
}}
"""

View File

@ -57,6 +57,8 @@ def setup_state(state):
state["responses"] = {}
if "tools" not in state:
state["tools"] = {}
if "tasks" not in state:
state["tasks"] = []
if "only_n_most_recent_images" not in state:
state["only_n_most_recent_images"] = 2
if 'stop' not in state:
@ -79,17 +81,6 @@ def load_from_storage(filename: str) -> str | None:
print(f"Debug: Error loading {filename}: {e}")
return None
def save_to_storage(filename: str, data: str) -> None:
"""Save data to a file in the storage directory."""
try:
CONFIG_DIR.mkdir(parents=True, exist_ok=True)
file_path = CONFIG_DIR / filename
file_path.write_text(data)
# Ensure only user can read/write the file
file_path.chmod(0o600)
except Exception as e:
print(f"Debug: Error saving {filename}: {e}")
def format_json_content(json_content):
"""Format JSON content with reasoning and details"""
content_json = json.loads(json_content)
@ -137,8 +128,7 @@ def process_input(user_input, state, vision_agent_state):
# Add user message
state["messages"].append({"role": "user", "content": user_input})
state["chatbox_messages"].append({"role": "user", "content": user_input})
yield state["chatbox_messages"]
yield state["chatbox_messages"], []
# Process with agent
agent = vision_agent_state["agent"]
for _ in sampling_loop_sync(
@ -149,7 +139,23 @@ def process_input(user_input, state, vision_agent_state):
):
if state["stop"]:
return
# task_plan_agent first response
if len(state["messages"]) == 2:
task_list = json.loads(state["messages"][-1]["content"])["task_list"]
for task in task_list:
state["tasks"].append({
"status": "",
"task": task
})
else:
# Reset all tasks to pending status
for i in range(len(state["tasks"])):
state["tasks"][i]["status"] = ""
task_completed_number = json.loads(state["messages"][-1]["content"])["current_task_id"]
for i in range(task_completed_number+1):
state["tasks"][i]["status"] = ""
# Rebuild chatbox messages from the original messages
state["chatbox_messages"] = []
@ -169,7 +175,9 @@ def process_input(user_input, state, vision_agent_state):
"content": formatted_content
})
yield state["chatbox_messages"]
# 在返回结果前转换数据格式
tasks_2d = [[task["status"], task["task"]] for task in state["tasks"]]
yield state["chatbox_messages"], tasks_2d
def is_json_format(text):
try:
@ -180,7 +188,7 @@ def is_json_format(text):
def stop_app(state):
state["stop"] = True
return "App stopped"
return
def get_header_image_base64():
try:
@ -195,14 +203,7 @@ def get_header_image_base64():
print(f"Failed to load header image: {e}")
return None
def update_task_list(state):
"""Update task list with completed tasks marked"""
tasks = state.get("tasks", [])
task_status = []
for task in tasks:
status = "" if task.get("completed", False) else ""
task_status.append([task.get("description", ""), status])
return task_status
def run():
with gr.Blocks(theme=gr.themes.Default()) as demo:
gr.HTML("""
@ -281,11 +282,11 @@ def run():
with gr.Row():
with gr.Column(scale=2):
task_list = gr.Dataframe(
headers=["Status", "Task"],
headers=["status", "task"],
datatype=["str", "str"],
value=[],
label="Task List",
interactive=False )
interactive=False)
with gr.Column(scale=8):
chatbot = gr.Chatbot(
@ -318,9 +319,9 @@ def run():
vision_agent = VisionAgent(yolo_model_path=os.path.join(MODEL_DIR, "icon_detect", "model.pt"),
caption_model_path=os.path.join(MODEL_DIR, "icon_caption"))
vision_agent_state = gr.State({"agent": vision_agent})
submit_button.click(process_input, [chat_input, state, vision_agent_state], chatbot)
submit_button.click(process_input, [chat_input, state, vision_agent_state], [chatbot, task_list])
stop_button.click(stop_app, [state], None)
base_url.change(fn=update_base_url, inputs=[base_url, state], outputs=None)
demo.launch(server_name="0.0.0.0", server_port=7888)

View File

@ -1,10 +1,6 @@
import asyncio
import json
from typing import Any, cast
from anthropic.types.beta import (
BetaMessageParam,
BetaContentBlockParam,
BetaToolResultBlockParam,
BetaContentBlock
)
from gradio_ui.tools import ComputerTool, ToolCollection
@ -29,8 +25,5 @@ class AnthropicExecutor:
tool_result_content.append(
str(result)
)
messages.append({"role": "assistant", "content": "Run tool result:\n"+str(tool_result_content)})
if not tool_result_content:
return messages
return tool_result_content

View File

@ -3,6 +3,7 @@ Agentic sampling loop that calls the Anthropic API and local implenmentation of
"""
import base64
from io import BytesIO
import json
import cv2
from gradio_ui.agent.vision_agent import VisionAgent
from gradio_ui.tools.screen_capture import get_screenshot
@ -21,7 +22,7 @@ def sampling_loop_sync(
messages: list[BetaMessageParam],
vision_agent: VisionAgent,
screen_region: tuple[int, int, int, int]
):
):
"""
Synchronous agentic sampling loop for the assistant/tool interaction of computer use.
"""
@ -30,17 +31,20 @@ def sampling_loop_sync(
executor = AnthropicExecutor()
task_run_agent = TaskRunAgent()
parsed_screen_result = parsed_screen(vision_agent, screen_region)
plan_list = task_plan_agent(messages=messages, parsed_screen_result=parsed_screen_result)
task_plan_agent(messages=messages, parsed_screen_result=parsed_screen_result)
yield
for plan in plan_list:
execute_task_plan(plan, vision_agent, task_run_agent, executor, messages, screen_region)
while True:
execute_result = execute_task_plan(vision_agent, task_run_agent, executor, messages, screen_region)
if execute_result['next_action'] == 'None':
break
yield
def execute_task_plan(plan, vision_agent, task_run_agent, executor, messages, screen_region):
def execute_task_plan(vision_agent, task_run_agent, executor, messages, screen_region):
parsed_screen_result = parsed_screen(vision_agent, screen_region)
tools_use_needed, __ = task_run_agent(task_plan=plan, parsed_screen_result=parsed_screen_result, messages=messages)
tools_use_needed, vlm_response_json = task_run_agent(parsed_screen_result=parsed_screen_result, messages=messages)
executor(tools_use_needed, messages)
return vlm_response_json
def parsed_screen(vision_agent: VisionAgent, screen_region: tuple[int, int, int, int] = None):
screenshot, screenshot_path = get_screenshot(screen_region)

BIN
resources/wxchat.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB