From 8433098673d8b5904f33727eb43860c306eedbc6 Mon Sep 17 00:00:00 2001 From: yuruo Date: Sat, 15 Mar 2025 11:38:42 +0800 Subject: [PATCH] delete verification agent --- gradio_ui/agent/verification_agent.py | 96 --------------------------- gradio_ui/app.py | 10 +-- gradio_ui/loop.py | 27 ++------ gradio_ui/tools/computer.py | 10 --- gradio_ui/tools/screen_capture.py | 6 +- 5 files changed, 11 insertions(+), 138 deletions(-) delete mode 100644 gradio_ui/agent/verification_agent.py diff --git a/gradio_ui/agent/verification_agent.py b/gradio_ui/agent/verification_agent.py deleted file mode 100644 index 7386373..0000000 --- a/gradio_ui/agent/verification_agent.py +++ /dev/null @@ -1,96 +0,0 @@ -import json -from pydantic import Field,BaseModel -from gradio_ui.agent.base_agent import BaseAgent -from xbrain.core.chat import run - -from gradio_ui.tools.computer import Action - -class VerificationAgent(BaseAgent): - def __call__(self, messages, parsed_screen_result): - messages.append( - {"role": "user", - "content": [ - {"type": "text", "text": "Image is the screenshot of the current screen"}, - { - "type": "image_url", - "image_url": {"url": f"data:image/png;base64,{parsed_screen_result['base64_image']}"} - } - ] - }) - response = run( - messages, - user_prompt=prompt.format(screen_info=str(parsed_screen_result['parsed_content_list']), action_list=str(Action)), - response_format=VerificationResponse - ) - return json.loads(response) - -class VerificationResponse(BaseModel): - verification_status: str = Field(description="验证状态", json_schema_extra={"enum": ["success", "error"]}) - verification_method: str = Field(description="验证方法") - reasoning: str = Field(description="描述您验证的逻辑") - failure_reason: str = Field(description="失败原因") - remedy_measures: list[str] = Field(description="补救措施") - -prompt = """ -### 目标 ### -你是自动化验证专家,负责确认每个操作后的预期结果是否达成,保证自动化流程可靠执行。 -以下是当前屏幕上的内容: -{screen_info} - -### 输入 ### -1. 操作信息:刚执行的操作类型和参数 -2. 屏幕状态:当前屏幕上的视觉元素和状态 -3. 预期结果:操作应该产生的效果 - -### 输出格式 ### -验证结果应采用以下JSON格式: -{{ - "验证状态": "成功/失败", - "验证方法": "使用的验证方法", - "证据": "支持验证结果的具体证据", - "失败原因": "如果失败,分析可能的原因", - "补救措施": [ - "再执行一次操作" - ], -}} - -### 验证方法 ### -1. **视觉验证**:识别特定UI元素是否出现或消失 - - 元素存在性:检查某元素是否存在 - - 元素状态:检查元素是否处于特定状态(激活、禁用等) - - 视觉变化:检查屏幕特定区域是否发生变化 - -2. **内容验证**:确认特定文本或数据是否正确 - - 文本匹配:页面上是否包含预期文本 - - 数据一致性:显示的数据是否符合预期 - - 计数验证:元素数量是否符合预期 - -3. **系统状态验证**:检查系统响应 - - 进程状态:特定进程是否运行 - - 文件变化:文件是否被创建、修改或删除 - - 网络活动:是否有特定网络请求或响应 - -### 验证策略 ### -- **重试机制**:指定最大重试次数和间隔时间 -- **渐进式验证**:先验证基础条件,再验证详细条件 -- **模糊匹配**:允许近似匹配而非精确匹配 -- **超时设置**:指定验证的最长等待时间 - -### 补救措施 ### -补救措施建议如下: -- 【推荐】可以再等待一段时间看看效果,因为上一个操作还没执行完成就开始了验证 -- 再一次操作 -- 检查是否存在其他验证方法,但是仅限于以下几个动作: -{action_list} -### 例子 ### -操作:点击"登录"按钮 -预期结果:登录成功并显示首页 -验证输出: -{{ - "verification_status": "success", - "verification_method": "视觉验证+内容验证", - "reasoning": "1. 检测到欢迎消息'你好,用户名' 2. 导航栏显示用户头像 3. URL已变更为首页地址", - "failure_reason": "无", - "remedy_measures": [], -}} -""" diff --git a/gradio_ui/app.py b/gradio_ui/app.py index 8b48510..8c03fa1 100644 --- a/gradio_ui/app.py +++ b/gradio_ui/app.py @@ -113,7 +113,8 @@ def process_input(user_input, state, vision_agent_state): for _ in sampling_loop_sync( model=state["model"], messages=state["messages"], - vision_agent = agent + vision_agent = agent, + screen_region=state.get("screen_region", None) ): if state["stop"]: return @@ -219,14 +220,14 @@ def run(): model = gr.Textbox( label="Model", value=state.value["model"], - placeholder="输入模型名称", + placeholder="Input model name", interactive=True, ) with gr.Column(): base_url = gr.Textbox( label="Base URL", value=state.value["base_url"], - placeholder="输入基础 URL", + placeholder="input base url", interactive=True ) with gr.Row(): @@ -239,8 +240,7 @@ def run(): ) with gr.Column(): - select_region_btn = gr.Button(value="Select Region", variant="primary") - + select_region_btn = gr.Button(value="Select Screen Region", variant="primary") def select_screen_region(state): from util.screen_selector import ScreenSelector region = ScreenSelector().get_selection() diff --git a/gradio_ui/loop.py b/gradio_ui/loop.py index 6eb1d80..ad852bf 100644 --- a/gradio_ui/loop.py +++ b/gradio_ui/loop.py @@ -21,7 +21,8 @@ def sampling_loop_sync( *, model: str, messages: list[BetaMessageParam], - vision_agent: VisionAgent + vision_agent: VisionAgent, + screen_region: tuple[int, int, int, int] ): """ Synchronous agentic sampling loop for the assistant/tool interaction of computer use. @@ -29,7 +30,6 @@ def sampling_loop_sync( print('in sampling_loop_sync, model:', model) task_plan_agent = TaskPlanAgent() executor = AnthropicExecutor() - verification_agent = VerificationAgent() task_run_agent = TaskRunAgent() parsed_screen_result = parsed_screen(vision_agent) plan_list = task_plan_agent(messages=messages, parsed_screen_result=parsed_screen_result) @@ -37,32 +37,15 @@ def sampling_loop_sync( for plan in plan_list: execute_task_plan(plan, vision_agent, task_run_agent, executor, messages) yield - sleep(5) - yield from verification_loop(vision_agent, verification_agent, executor, task_run_agent, messages) - - -def verification_loop(vision_agent, verification_agent, executor, task_run_agent, messages): - """verification agent will be called in the loop""" - while True: - # verification result - parsed_screen_result = parsed_screen(vision_agent) - verification_result = verification_agent(messages, parsed_screen_result) - yield - # if verification success, return result - if verification_result["verification_status"] == "success": - return - # if verification failed, execute remedy measures - elif verification_result["verification_status"] == "error": - execute_task_plan(verification_result["remedy_measures"], vision_agent, task_run_agent, executor, messages) - yield + def execute_task_plan(plan, vision_agent, task_run_agent, executor, messages): parsed_screen_result = parsed_screen(vision_agent) tools_use_needed, __ = task_run_agent(task_plan=plan, parsed_screen_result=parsed_screen_result, messages=messages) executor(tools_use_needed, messages) -def parsed_screen(vision_agent: VisionAgent): - screenshot, screenshot_path = get_screenshot() +def parsed_screen(vision_agent: VisionAgent, screen_region: tuple[int, int, int, int] = None): + screenshot, screenshot_path = get_screenshot(screen_region) response_json = {} response_json['parsed_content_list'] = vision_agent(str(screenshot_path)) response_json['width'] = screenshot.size[0] diff --git a/gradio_ui/tools/computer.py b/gradio_ui/tools/computer.py index 1fcbcca..30d9f40 100644 --- a/gradio_ui/tools/computer.py +++ b/gradio_ui/tools/computer.py @@ -23,7 +23,6 @@ Action = [ "right_click", "middle_click", "double_click", - "screenshot", "cursor_position", "hover", "wait", @@ -151,7 +150,6 @@ class ComputerTool(BaseAnthropicTool): "right_click", "double_click", "middle_click", - "screenshot", "cursor_position", "left_press", ): @@ -159,8 +157,6 @@ class ComputerTool(BaseAnthropicTool): raise ToolError(f"text is not accepted for {action}") if coordinate is not None: raise ToolError(f"coordinate is not accepted for {action}") - if action == "screenshot": - return await self.screenshot() elif action == "cursor_position": x, y = pyautogui.position() # 直接返回原始坐标,不进行缩放 @@ -194,12 +190,6 @@ class ComputerTool(BaseAnthropicTool): return ToolResult(output=f"Performed {action}") raise ToolError(f"Invalid action: {action}") - async def screenshot(self): - width, height = self.target_dimension["width"], self.target_dimension["height"] - screenshot, path = get_screenshot(resize=True, target_width=width, target_height=height) - time.sleep(0.7) # avoid async error as actions take time to complete - return ToolResult(base64_image=base64.b64encode(path.read_bytes()).decode()) - def padding_image(self, screenshot): """Pad the screenshot to 16:10 aspect ratio, when the aspect ratio is not 16:10.""" _, height = screenshot.size diff --git a/gradio_ui/tools/screen_capture.py b/gradio_ui/tools/screen_capture.py index 2642741..68bb277 100644 --- a/gradio_ui/tools/screen_capture.py +++ b/gradio_ui/tools/screen_capture.py @@ -6,17 +6,13 @@ from util import tool OUTPUT_DIR = "./tmp/outputs" -def get_screenshot(resize: bool = False, target_width: int = 1920, target_height: int = 1080): - """Capture screenshot by requesting from HTTP endpoint - returns native resolution unless resized""" +def get_screenshot(): output_dir = Path(OUTPUT_DIR) output_dir.mkdir(parents=True, exist_ok=True) path = output_dir / f"screenshot_{uuid4().hex}.png" - try: img_io = tool.capture_screen_with_cursor() screenshot = Image.open(img_io) - if resize and screenshot.size != (target_width, target_height): - screenshot = screenshot.resize((target_width, target_height)) screenshot.save(path) return screenshot, path except Exception as e: