mirror of
https://github.com/yuruotong1/autoMate.git
synced 2025-12-26 05:16:21 +08:00
delete verification agent
This commit is contained in:
parent
55e2876f81
commit
8433098673
@ -1,96 +0,0 @@
|
||||
import json
|
||||
from pydantic import Field,BaseModel
|
||||
from gradio_ui.agent.base_agent import BaseAgent
|
||||
from xbrain.core.chat import run
|
||||
|
||||
from gradio_ui.tools.computer import Action
|
||||
|
||||
class VerificationAgent(BaseAgent):
|
||||
def __call__(self, messages, parsed_screen_result):
|
||||
messages.append(
|
||||
{"role": "user",
|
||||
"content": [
|
||||
{"type": "text", "text": "Image is the screenshot of the current screen"},
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {"url": f"data:image/png;base64,{parsed_screen_result['base64_image']}"}
|
||||
}
|
||||
]
|
||||
})
|
||||
response = run(
|
||||
messages,
|
||||
user_prompt=prompt.format(screen_info=str(parsed_screen_result['parsed_content_list']), action_list=str(Action)),
|
||||
response_format=VerificationResponse
|
||||
)
|
||||
return json.loads(response)
|
||||
|
||||
class VerificationResponse(BaseModel):
|
||||
verification_status: str = Field(description="验证状态", json_schema_extra={"enum": ["success", "error"]})
|
||||
verification_method: str = Field(description="验证方法")
|
||||
reasoning: str = Field(description="描述您验证的逻辑")
|
||||
failure_reason: str = Field(description="失败原因")
|
||||
remedy_measures: list[str] = Field(description="补救措施")
|
||||
|
||||
prompt = """
|
||||
### 目标 ###
|
||||
你是自动化验证专家,负责确认每个操作后的预期结果是否达成,保证自动化流程可靠执行。
|
||||
以下是当前屏幕上的内容:
|
||||
{screen_info}
|
||||
|
||||
### 输入 ###
|
||||
1. 操作信息:刚执行的操作类型和参数
|
||||
2. 屏幕状态:当前屏幕上的视觉元素和状态
|
||||
3. 预期结果:操作应该产生的效果
|
||||
|
||||
### 输出格式 ###
|
||||
验证结果应采用以下JSON格式:
|
||||
{{
|
||||
"验证状态": "成功/失败",
|
||||
"验证方法": "使用的验证方法",
|
||||
"证据": "支持验证结果的具体证据",
|
||||
"失败原因": "如果失败,分析可能的原因",
|
||||
"补救措施": [
|
||||
"再执行一次操作"
|
||||
],
|
||||
}}
|
||||
|
||||
### 验证方法 ###
|
||||
1. **视觉验证**:识别特定UI元素是否出现或消失
|
||||
- 元素存在性:检查某元素是否存在
|
||||
- 元素状态:检查元素是否处于特定状态(激活、禁用等)
|
||||
- 视觉变化:检查屏幕特定区域是否发生变化
|
||||
|
||||
2. **内容验证**:确认特定文本或数据是否正确
|
||||
- 文本匹配:页面上是否包含预期文本
|
||||
- 数据一致性:显示的数据是否符合预期
|
||||
- 计数验证:元素数量是否符合预期
|
||||
|
||||
3. **系统状态验证**:检查系统响应
|
||||
- 进程状态:特定进程是否运行
|
||||
- 文件变化:文件是否被创建、修改或删除
|
||||
- 网络活动:是否有特定网络请求或响应
|
||||
|
||||
### 验证策略 ###
|
||||
- **重试机制**:指定最大重试次数和间隔时间
|
||||
- **渐进式验证**:先验证基础条件,再验证详细条件
|
||||
- **模糊匹配**:允许近似匹配而非精确匹配
|
||||
- **超时设置**:指定验证的最长等待时间
|
||||
|
||||
### 补救措施 ###
|
||||
补救措施建议如下:
|
||||
- 【推荐】可以再等待一段时间看看效果,因为上一个操作还没执行完成就开始了验证
|
||||
- 再一次操作
|
||||
- 检查是否存在其他验证方法,但是仅限于以下几个动作:
|
||||
{action_list}
|
||||
### 例子 ###
|
||||
操作:点击"登录"按钮
|
||||
预期结果:登录成功并显示首页
|
||||
验证输出:
|
||||
{{
|
||||
"verification_status": "success",
|
||||
"verification_method": "视觉验证+内容验证",
|
||||
"reasoning": "1. 检测到欢迎消息'你好,用户名' 2. 导航栏显示用户头像 3. URL已变更为首页地址",
|
||||
"failure_reason": "无",
|
||||
"remedy_measures": [],
|
||||
}}
|
||||
"""
|
||||
@ -113,7 +113,8 @@ def process_input(user_input, state, vision_agent_state):
|
||||
for _ in sampling_loop_sync(
|
||||
model=state["model"],
|
||||
messages=state["messages"],
|
||||
vision_agent = agent
|
||||
vision_agent = agent,
|
||||
screen_region=state.get("screen_region", None)
|
||||
):
|
||||
if state["stop"]:
|
||||
return
|
||||
@ -219,14 +220,14 @@ def run():
|
||||
model = gr.Textbox(
|
||||
label="Model",
|
||||
value=state.value["model"],
|
||||
placeholder="输入模型名称",
|
||||
placeholder="Input model name",
|
||||
interactive=True,
|
||||
)
|
||||
with gr.Column():
|
||||
base_url = gr.Textbox(
|
||||
label="Base URL",
|
||||
value=state.value["base_url"],
|
||||
placeholder="输入基础 URL",
|
||||
placeholder="input base url",
|
||||
interactive=True
|
||||
)
|
||||
with gr.Row():
|
||||
@ -239,8 +240,7 @@ def run():
|
||||
)
|
||||
|
||||
with gr.Column():
|
||||
select_region_btn = gr.Button(value="Select Region", variant="primary")
|
||||
|
||||
select_region_btn = gr.Button(value="Select Screen Region", variant="primary")
|
||||
def select_screen_region(state):
|
||||
from util.screen_selector import ScreenSelector
|
||||
region = ScreenSelector().get_selection()
|
||||
|
||||
@ -21,7 +21,8 @@ def sampling_loop_sync(
|
||||
*,
|
||||
model: str,
|
||||
messages: list[BetaMessageParam],
|
||||
vision_agent: VisionAgent
|
||||
vision_agent: VisionAgent,
|
||||
screen_region: tuple[int, int, int, int]
|
||||
):
|
||||
"""
|
||||
Synchronous agentic sampling loop for the assistant/tool interaction of computer use.
|
||||
@ -29,7 +30,6 @@ def sampling_loop_sync(
|
||||
print('in sampling_loop_sync, model:', model)
|
||||
task_plan_agent = TaskPlanAgent()
|
||||
executor = AnthropicExecutor()
|
||||
verification_agent = VerificationAgent()
|
||||
task_run_agent = TaskRunAgent()
|
||||
parsed_screen_result = parsed_screen(vision_agent)
|
||||
plan_list = task_plan_agent(messages=messages, parsed_screen_result=parsed_screen_result)
|
||||
@ -37,32 +37,15 @@ def sampling_loop_sync(
|
||||
for plan in plan_list:
|
||||
execute_task_plan(plan, vision_agent, task_run_agent, executor, messages)
|
||||
yield
|
||||
sleep(5)
|
||||
yield from verification_loop(vision_agent, verification_agent, executor, task_run_agent, messages)
|
||||
|
||||
|
||||
def verification_loop(vision_agent, verification_agent, executor, task_run_agent, messages):
|
||||
"""verification agent will be called in the loop"""
|
||||
while True:
|
||||
# verification result
|
||||
parsed_screen_result = parsed_screen(vision_agent)
|
||||
verification_result = verification_agent(messages, parsed_screen_result)
|
||||
yield
|
||||
# if verification success, return result
|
||||
if verification_result["verification_status"] == "success":
|
||||
return
|
||||
# if verification failed, execute remedy measures
|
||||
elif verification_result["verification_status"] == "error":
|
||||
execute_task_plan(verification_result["remedy_measures"], vision_agent, task_run_agent, executor, messages)
|
||||
yield
|
||||
|
||||
|
||||
def execute_task_plan(plan, vision_agent, task_run_agent, executor, messages):
|
||||
parsed_screen_result = parsed_screen(vision_agent)
|
||||
tools_use_needed, __ = task_run_agent(task_plan=plan, parsed_screen_result=parsed_screen_result, messages=messages)
|
||||
executor(tools_use_needed, messages)
|
||||
|
||||
def parsed_screen(vision_agent: VisionAgent):
|
||||
screenshot, screenshot_path = get_screenshot()
|
||||
def parsed_screen(vision_agent: VisionAgent, screen_region: tuple[int, int, int, int] = None):
|
||||
screenshot, screenshot_path = get_screenshot(screen_region)
|
||||
response_json = {}
|
||||
response_json['parsed_content_list'] = vision_agent(str(screenshot_path))
|
||||
response_json['width'] = screenshot.size[0]
|
||||
|
||||
@ -23,7 +23,6 @@ Action = [
|
||||
"right_click",
|
||||
"middle_click",
|
||||
"double_click",
|
||||
"screenshot",
|
||||
"cursor_position",
|
||||
"hover",
|
||||
"wait",
|
||||
@ -151,7 +150,6 @@ class ComputerTool(BaseAnthropicTool):
|
||||
"right_click",
|
||||
"double_click",
|
||||
"middle_click",
|
||||
"screenshot",
|
||||
"cursor_position",
|
||||
"left_press",
|
||||
):
|
||||
@ -159,8 +157,6 @@ class ComputerTool(BaseAnthropicTool):
|
||||
raise ToolError(f"text is not accepted for {action}")
|
||||
if coordinate is not None:
|
||||
raise ToolError(f"coordinate is not accepted for {action}")
|
||||
if action == "screenshot":
|
||||
return await self.screenshot()
|
||||
elif action == "cursor_position":
|
||||
x, y = pyautogui.position()
|
||||
# 直接返回原始坐标,不进行缩放
|
||||
@ -194,12 +190,6 @@ class ComputerTool(BaseAnthropicTool):
|
||||
return ToolResult(output=f"Performed {action}")
|
||||
raise ToolError(f"Invalid action: {action}")
|
||||
|
||||
async def screenshot(self):
|
||||
width, height = self.target_dimension["width"], self.target_dimension["height"]
|
||||
screenshot, path = get_screenshot(resize=True, target_width=width, target_height=height)
|
||||
time.sleep(0.7) # avoid async error as actions take time to complete
|
||||
return ToolResult(base64_image=base64.b64encode(path.read_bytes()).decode())
|
||||
|
||||
def padding_image(self, screenshot):
|
||||
"""Pad the screenshot to 16:10 aspect ratio, when the aspect ratio is not 16:10."""
|
||||
_, height = screenshot.size
|
||||
|
||||
@ -6,17 +6,13 @@ from util import tool
|
||||
|
||||
OUTPUT_DIR = "./tmp/outputs"
|
||||
|
||||
def get_screenshot(resize: bool = False, target_width: int = 1920, target_height: int = 1080):
|
||||
"""Capture screenshot by requesting from HTTP endpoint - returns native resolution unless resized"""
|
||||
def get_screenshot():
|
||||
output_dir = Path(OUTPUT_DIR)
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
path = output_dir / f"screenshot_{uuid4().hex}.png"
|
||||
|
||||
try:
|
||||
img_io = tool.capture_screen_with_cursor()
|
||||
screenshot = Image.open(img_io)
|
||||
if resize and screenshot.size != (target_width, target_height):
|
||||
screenshot = screenshot.resize((target_width, target_height))
|
||||
screenshot.save(path)
|
||||
return screenshot, path
|
||||
except Exception as e:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user