update new code

This commit is contained in:
yuruo
2025-03-26 16:11:02 +08:00
parent f2476d53a3
commit 22b3ca9373
62 changed files with 1339 additions and 2983 deletions

BIN
auto_control/.DS_Store vendored

Binary file not shown.

View File

@@ -1,8 +0,0 @@
class BaseAgent:
def __init__(self, *args, **kwargs):
self.SYSTEM_PROMPT = ""
def chat(self, messages):
pass

View File

@@ -1,37 +0,0 @@
from argparse import Action
import json
from auto_control.agent.base_agent import BaseAgent
from xbrain.core.chat import run
class FewShotGenerateAgent(BaseAgent):
def __call__(self, action_list):
# Create content list with text-image pairs for each action
content_list = []
for idx, action in enumerate(action_list, 1):
# Create a copy of action without screen_result
action_without_screen = action.copy()
action_without_screen.pop('base64_image', None)
content_list.extend([
{"type": "text", "text": f"Step {idx}:\n{json.dumps(action_without_screen, indent=2)}"},
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{action['base64_image']}"}
}
])
messages = [{"role": "user", "content": content_list}]
user_prompt = prompt.format(actions=json.dumps(action_list, indent=2))
response = run(
messages,
user_prompt=user_prompt)
return response
prompt = """Please analyze this sequence of user input actions and create few-shot learning examples.
The recorded actions include mouse clicks, keyboard inputs, and special key presses, along with their timing and UI context.
Please create structured examples that show:
1. The user's intent and context
2. The sequence of actions needed
3. Important UI elements involved
4. Any timing or order dependencies
Format each example to demonstrate the complete interaction pattern."""

View File

@@ -1,73 +0,0 @@
import json
from pydantic import BaseModel, Field
from auto_control.agent.base_agent import BaseAgent
from xbrain.core.chat import run
from auto_control.tools.computer import Action
class TaskPlanAgent(BaseAgent):
def __call__(self, messages, parsed_screen_result):
messages[-1] = {"role": "user",
"content": [
{"type": "text", "text": messages[-1]["content"]},
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{parsed_screen_result['base64_image']}"}
}
]
}
response = run(messages, user_prompt=system_prompt.format(action_list=str(Action)), response_format=TaskPlanResponse)
print("task_plan_agent response: ", response)
return json.loads(response)
class TaskPlanResponse(BaseModel):
reasoning: str = Field(description="描述您规划任务的逻辑")
task_list: list[str] = Field(description="任务列表")
system_prompt = """
### 目标 ###
你是自动化操作规划专家,根据屏幕内容和用户需求,规划精确可执行的操作序列。
### 输入 ###
1. 用户需求:文本描述形式的任务目标
2. 当前环境:屏幕上可见的元素和状态
### 输出格式 ###
操作序列应采用以下JSON格式
[
{{
"reasoning": "描述您规划任务的逻辑",
"task_plan": ["任务1", "任务2", "任务3"]
}}
]
任务中的操作应该仅包含:
{action_list}
### 限制 ###
- 不要说点击xx坐标这样用户无法理解应该说点击地址栏、搜索框、输入按钮等
### 例子 ###
输入获取AI新闻
输出:
[
{{
"reasoning": "看到有一个地址栏所以应该在地址栏输入https://www.baidu.com",
"task_plan": ["在地址栏输入https://www.baidu.com"]
}},
{{
"reasoning": "这是百度页面看到有一个搜索框所以应该在搜索框输入AI最新新闻",
"task_plan": ["在搜索框输入AI最新新闻"]
}},
{{
"reasoning": "看到有一个搜索按钮,所以应该点击搜索按钮",
"task_plan": ["点击搜索按钮"]
}}
]
"""

View File

@@ -1,187 +0,0 @@
import json
import uuid
from anthropic.types.beta import BetaMessage, BetaTextBlock, BetaToolUseBlock, BetaMessageParam, BetaUsage
from pydantic import Field, create_model
from auto_control.agent.base_agent import BaseAgent
from xbrain.core.chat import run
from auto_control.tools.computer import Action
class TaskRunAgent(BaseAgent):
def __init__(self):
self.OUTPUT_DIR = "./tmp/outputs"
def __call__(self, parsed_screen_result, messages):
messages.append(
{"role": "user",
"content": [
{"type": "text", "text": "Image is the screenshot of the current screen"},
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{parsed_screen_result['base64_image']}"}
}
]
}
)
task_list = json.loads(messages[1]['content'])['task_list']
# Convert task_list to a numbered format
formatted_task_list = "\n".join([f"{i}.{task}" for i, task in enumerate(task_list)])
system_prompt = prompt.format(task_list=formatted_task_list)
vlm_response = run(
messages,
user_prompt=system_prompt,
response_format=create_dynamic_response_model(parsed_screen_result)
)
vlm_response_json = json.loads(vlm_response)
response_content = [BetaTextBlock(text=vlm_response_json["reasoning"], type='text')]
# Handle cursor movement based on box_id
if "box_id" in vlm_response_json:
action_types_without_cursor = ["None", "key", "type", "scroll_down", "scroll_up", "cursor_position", "wait"]
if vlm_response_json["box_id"] != -1 and vlm_response_json["next_action"] not in action_types_without_cursor:
# Move cursor to the center of the identified element
element = self.find_element_by_id(parsed_screen_result, vlm_response_json["box_id"])
bbox = element.coordinates
box_centroid_coordinate = [
int((bbox[0] + bbox[2]) / 2),
int((bbox[1] + bbox[3]) / 2)
]
move_cursor_block = BetaToolUseBlock(
id=f'toolu_{uuid.uuid4()}',
input={'action': 'mouse_move', 'coordinate': box_centroid_coordinate},
name='computer',
type='tool_use'
)
response_content.append(move_cursor_block)
elif vlm_response_json["box_id"] == -1 and len(vlm_response_json["coordinates"]) == 2:
# Move cursor to specified coordinates
move_cursor_block = BetaToolUseBlock(
id=f'toolu_{uuid.uuid4()}',
input={'action': 'mouse_move', 'coordinate': vlm_response_json["coordinates"]},
name='computer',
type='tool_use'
)
response_content.append(move_cursor_block)
if vlm_response_json["next_action"] == "None":
print("Task paused/completed.")
elif vlm_response_json["next_action"] == "type":
sim_content_block = BetaToolUseBlock(id=f'toolu_{uuid.uuid4()}',
input={'action': vlm_response_json["next_action"], 'text': vlm_response_json["value"]},
name='computer', type='tool_use')
response_content.append(sim_content_block)
else:
sim_content_block = BetaToolUseBlock(id=f'toolu_{uuid.uuid4()}',
input={'action': vlm_response_json["next_action"]},
name='computer', type='tool_use')
response_content.append(sim_content_block)
response_message = BetaMessage(id=f'toolu_{uuid.uuid4()}', content=response_content, model='', role='assistant', type='message', stop_reason='tool_use', usage=BetaUsage(input_tokens=0, output_tokens=0))
return response_message, vlm_response_json
def find_element_by_id(self, parsed_screen_result, box_id):
for element in parsed_screen_result["parsed_content_list"]:
if element.element_id == box_id:
return element
return None
def create_dynamic_response_model(parsed_screen_result):
available_box_ids = [item.element_id for item in parsed_screen_result['parsed_content_list']]
available_box_ids.append(-1)
task_run_agent_response = create_model(
'TaskRunAgentResponse',
reasoning = (str, Field(
description="描述当前屏幕上的内容,考虑历史记录,然后说出你要这么做的理由。"
)),
next_action = (str, Field(
description="选择一个操作类型如果找不到合适的操作请选择None",
json_schema_extra={
"enum": Action
}
)),
box_id = (int, Field(
description="要操作的框ID如果框ID不存在就返回-1",
json_schema_extra={
"enum": available_box_ids
}
)),
coordinates = (list[int], Field(
description="当 box_id 为-1时直接返回要操作对象的坐标只返回x,y这2个整数"
)),
value = (str, Field(
description="仅当next_action为type时提供否则为None"
)),
current_task_id = (int, Field(
description="请判断一下你正在完成第几个任务第一个任务是0"
))
)
return task_run_agent_response
prompt = """
### 目标 ###
你是一个任务执行者。请你根据屏幕截图和【所有元素】确定接下来要做什么如果任务完成把next_action设置为None
请根据以下任务列表判断一下你正在执行第几个任务current_task_id第一个任务是0任务列表如下
{task_list}
##########
### 注意 ###
- 要结合用户传入的屏幕图片观察其中的 box_id 框框和标号确定要操作哪一个box_id如果没有合适的请返回-1然后通过coordinates给出要操作对象的坐标。
- 每次应该只给出一个操作告诉我要对哪个box_id进行操作、输入什么内容或者滚动或者其他操作。
- 应该对当前屏幕进行分析,通过查看历史记录反思已完成的工作,然后描述您如何实现任务的逐步思考。
- 避免连续多次选择相同的操作/元素,如果发生这种情况,反思自己,可能出了什么问题,并预测不同的操作。
- 任务不是连续的上一次是1下一次不一定是2你要根据next_action进行判断。
- current_task_id 要在任务列表中找到,不要随便写。
- 当你觉得任务已经完成时请一定把next_action设置为'None',不然会重复执行。
- 涉及到输入type、key操作时其上一步操作一定是点击输入框操作。
##########
### 输出格式 ###
```json
{{
"reasoning": str, # 综合当前屏幕上的内容和历史记录,描述您是如何思考的。
"next_action": str, # 要执行的动作。
"box_id": int, # 要操作的框ID当next_action为left_click、right_click、double_click、hover时提供否则为None
"value": "xxx" # 仅当操作为type时提供value字段否则不包括value键
"current_task_id": int # 当前正在执行第几个任务第一个任务是0,
"coordinates": list[int] # 仅当box_id为-1时提供返回要操作对象的坐标只返回x,y这2个整数
}}
```
##########
### 案例 ###
任务列表:
0. 打开浏览器
1. 搜索亚马逊
2. 点击第一个搜索结果
一个例子:
```json
{{
"reasoning": "当前屏幕显示亚马逊的谷歌搜索结果在之前的操作中我已经在谷歌上搜索了亚马逊。然后我需要点击第一个搜索结果以转到amazon.com。",
"next_action": "left_click",
"box_id": 35,
"current_task_id": 0
}}
```
另一个例子:
```json
{{
"reasoning": "当前屏幕显示亚马逊的首页。没有之前的操作。因此,我需要在搜索栏中输入"Apple watch"",
"next_action": "type",
"box_id": 27,
"value": "Apple watch",
"current_task_id": 1
}}
```
另一个例子:
```json
{{
"reasoning": "当前屏幕没有显示'提交'按钮,我需要向下滚动以查看按钮是否可用。",
"next_action": "scroll_down",
"current_task_id": 2
}}
"""

View File

@@ -1,92 +0,0 @@
from typing import List
import cv2
from ultralytics import YOLO
import supervision as sv
import numpy as np
from pydantic import BaseModel
class UIElement(BaseModel):
element_id: int
coordinates: list[float]
class VisionAgent:
def __init__(self, yolo_model_path: str):
"""
Initialize the vision agent
Parameters:
yolo_model_path: Path to YOLO model
"""
# determine the available device and the best dtype
# load the YOLO model
self.yolo_model = YOLO(yolo_model_path)
self.elements: List[UIElement] = []
def __call__(self, image_path: str) -> List[UIElement]:
"""Process an image from file path."""
# image = self.load_image(image_source)
image = cv2.imread(image_path)
if image is None:
raise FileNotFoundError(f"Vision agent: Failed to read image")
return self.analyze_image(image)
def _reset_state(self):
"""Clear previous analysis results"""
self.elements = []
def analyze_image(self, image: np.ndarray) -> List[UIElement]:
"""
Process an image through all computer vision pipelines.
Args:
image: Input image in BGR format (OpenCV default)
Returns:
List of detected UI elements with annotations
"""
self._reset_state()
boxes = self._detect_objects(image)
for idx in range(len(boxes)):
new_element = UIElement(element_id=idx,
coordinates=boxes[idx])
self.elements.append(new_element)
return self.elements
def _detect_objects(self, image: np.ndarray) -> tuple[list[np.ndarray], list]:
"""Run object detection pipeline"""
results = self.yolo_model(image)[0]
detections = sv.Detections.from_ultralytics(results)
boxes = detections.xyxy
if len(boxes) == 0:
return []
# Filter out boxes contained by others
areas = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
sorted_indices = np.argsort(-areas) # Sort descending by area
sorted_boxes = boxes[sorted_indices]
keep_sorted = []
for i in range(len(sorted_boxes)):
contained = False
for j in keep_sorted:
box_b = sorted_boxes[j]
box_a = sorted_boxes[i]
if (box_b[0] <= box_a[0] and box_b[1] <= box_a[1] and
box_b[2] >= box_a[2] and box_b[3] >= box_a[3]):
contained = True
break
if not contained:
keep_sorted.append(i)
# Map back to original indices
keep_indices = sorted_indices[keep_sorted]
filtered_boxes = boxes[keep_indices]
return filtered_boxes

View File

@@ -1,352 +0,0 @@
"""
python app.py --windows_host_url localhost:8006 --omniparser_server_url localhost:8000
"""
import json
import os
from pathlib import Path
import argparse
import gradio as gr
from auto_control.agent.vision_agent import VisionAgent
from auto_control.loop import (
sampling_loop_sync,
)
import base64
from xbrain.utils.config import Config
from util.download_weights import OMNI_PARSER_DIR
CONFIG_DIR = Path("~/.anthropic").expanduser()
API_KEY_FILE = CONFIG_DIR / "api_key"
INTRO_TEXT = '''
Base on Omniparser to control desktop!
'''
def parse_arguments():
parser = argparse.ArgumentParser(description="Gradio App")
parser.add_argument("--windows_host_url", type=str, default='localhost:8006')
parser.add_argument("--omniparser_server_url", type=str, default="localhost:8000")
return parser.parse_args()
args = parse_arguments()
def setup_state(state):
# 如果存在config则从config中加载数据
config = Config()
if config.OPENAI_API_KEY:
state["api_key"] = config.OPENAI_API_KEY
else:
state["api_key"] = ""
if config.OPENAI_BASE_URL:
state["base_url"] = config.OPENAI_BASE_URL
else:
state["base_url"] = "https://api.openai.com/v1"
if config.OPENAI_MODEL:
state["model"] = config.OPENAI_MODEL
else:
state["model"] = "gpt-4o"
if "messages" not in state:
state["messages"] = []
if "chatbox_messages" not in state:
state["chatbox_messages"] = []
if "auth_validated" not in state:
state["auth_validated"] = False
if "responses" not in state:
state["responses"] = {}
if "tools" not in state:
state["tools"] = {}
if "tasks" not in state:
state["tasks"] = []
if "only_n_most_recent_images" not in state:
state["only_n_most_recent_images"] = 2
if 'stop' not in state:
state['stop'] = False
# update state
return (
state["model"], # model textbox
state["base_url"], # base_url textbox
state["api_key"], # api_key textbox
state["chatbox_messages"], # chatbot
[[task["status"], task["task"]] for task in state["tasks"]] # task_list
)
def load_from_storage(filename: str) -> str | None:
"""Load data from a file in the storage directory."""
try:
file_path = CONFIG_DIR / filename
if file_path.exists():
data = file_path.read_text().strip()
if data:
return data
except Exception as e:
print(f"Debug: Error loading {filename}: {e}")
return None
def format_json_content(json_content):
"""Format JSON content with reasoning and details"""
content_json = json.loads(json_content)
reasoning = f'<h3>{content_json["reasoning"]}</h3>'
details = f'<br/> <details> <summary>Detail</summary> <pre>{json.dumps(content_json, indent=4, ensure_ascii=False)}</pre> </details>'
return reasoning, details
def format_message_content(content):
"""Format message content for gradio chatbox display"""
# Handle list-type content (multimodal)
if isinstance(content, list):
formatted_content = ""
json_reasoning = None
for item in content:
if item["type"] == "image_url":
formatted_content += f'<br/><img style="width: 100%;" src="{item["image_url"]["url"]}">'
elif item["type"] == "text":
if is_json_format(item["text"]):
reasoning, details = format_json_content(item["text"])
json_reasoning = reasoning
formatted_content += details
else:
formatted_content += item["text"]
return formatted_content, json_reasoning
# Handle string content
if is_json_format(content):
reasoning, _ = format_json_content(content)
formatted_content = json.dumps(json.loads(content), indent=4, ensure_ascii=False)
return formatted_content, reasoning
return content, None
def process_input(user_input, state, vision_agent_state):
# Reset the stop flag
if state["stop"]:
state["stop"] = False
# Configure API
config = Config()
config.set_openai_config(base_url=state["base_url"], api_key=state["api_key"], model=state["model"])
# Add user message
state["messages"].append({"role": "user", "content": user_input})
state["chatbox_messages"].append({"role": "user", "content": user_input})
yield state["chatbox_messages"], []
# Process with agent
agent = vision_agent_state["agent"]
for _ in sampling_loop_sync(
model=state["model"],
messages=state["messages"],
vision_agent=agent,
screen_region=state.get("screen_region", None)
):
if state["stop"]:
state["chatbox_messages"].append({"role": "user", "content": "Stop !"})
return
# task_plan_agent first response
if len(state["messages"]) == 2:
task_list = json.loads(state["messages"][-1]["content"])["task_list"]
for task in task_list:
state["tasks"].append({
"status": "",
"task": task
})
else:
# Reset all tasks to pending status
for i in range(len(state["tasks"])):
state["tasks"][i]["status"] = ""
task_completed_number = json.loads(state["messages"][-1]["content"])["current_task_id"]
if task_completed_number > len(state["tasks"]) + 1:
for i in range(len(state["tasks"])):
state["tasks"][i]["status"] = ""
else:
for i in range(task_completed_number + 1):
state["tasks"][i]["status"] = ""
# Rebuild chatbox messages from the original messages
state["chatbox_messages"] = []
for message in state["messages"]:
formatted_content, json_reasoning = format_message_content(message["content"])
# Add json reasoning as a separate message if exists
if json_reasoning:
state["chatbox_messages"].append({
"role": message["role"],
"content": json_reasoning
})
# Add the formatted content
state["chatbox_messages"].append({
"role": message["role"],
"content": formatted_content
})
# 在返回结果前转换数据格式
tasks_2d = [[task["status"], task["task"]] for task in state["tasks"]]
yield state["chatbox_messages"], tasks_2d
def is_json_format(text):
try:
json.loads(text)
return True
except:
return False
def stop_app(state):
state["stop"] = True
return
def get_header_image_base64():
try:
# Get the absolute path to the image relative to this script
script_dir = Path(__file__).parent
image_path = script_dir.parent / "imgs" / "header_bar_thin.png"
with open(image_path, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode()
return f'data:image/png;base64,{encoded_string}'
except Exception as e:
print(f"Failed to load header image: {e}")
return None
def run():
with gr.Blocks(theme=gr.themes.Default()) as demo:
gr.HTML("""
<style>
.no-padding {
padding: 0 !important;
}
.no-padding > div {
padding: 0 !important;
}
.markdown-text p {
font-size: 18px; /* Adjust the font size as needed */
}
</style>
""")
state = gr.State({})
setup_state(state.value)
header_image = get_header_image_base64()
if header_image:
gr.HTML(f'<img src="{header_image}" alt="autoMate Header" width="100%">', elem_classes="no-padding")
gr.HTML('<h1 style="text-align: center; font-weight: normal;">autoMate</h1>')
else:
gr.Markdown("# autoMate")
if not os.getenv("HIDE_WARNING", False):
gr.Markdown(INTRO_TEXT, elem_classes="markdown-text")
with gr.Accordion("Settings", open=True):
with gr.Row():
with gr.Column():
with gr.Row():
with gr.Column():
model = gr.Textbox(
label="Model",
value=state.value["model"],
placeholder="Input model name",
interactive=True,
)
with gr.Column():
base_url = gr.Textbox(
label="Base URL",
value=state.value["base_url"],
placeholder="input base url",
interactive=True
)
with gr.Row():
api_key = gr.Textbox(
label="API Key",
type="password",
value=state.value["api_key"],
placeholder="Paste your API key here",
interactive=True,
)
with gr.Column():
select_region_btn = gr.Button(value="Select Screen Region", variant="primary")
def select_screen_region(state):
from util.screen_selector import ScreenSelector
region = ScreenSelector().get_selection()
if region:
state["screen_region"] = region
return f"Selected region: {region}"
return "Selection cancelled"
select_region_btn.click(fn=select_screen_region, inputs=[state], outputs=[gr.Textbox(label="Region Info")])
with gr.Row():
with gr.Column(scale=8):
chat_input = gr.Textbox(show_label=False, placeholder="Type a message to send to Omniparser + X ...", container=False)
with gr.Column(scale=1, min_width=50):
submit_button = gr.Button(value="Send", variant="primary")
with gr.Column(scale=1, min_width=50):
stop_button = gr.Button(value="Stop", variant="secondary")
with gr.Row():
with gr.Column(scale=2):
task_list = gr.Dataframe(
headers=["status", "task"],
datatype=["str", "str"],
value=[],
label="Task List",
interactive=False)
with gr.Column(scale=8):
chatbot = gr.Chatbot(
label="Chatbot History",
autoscroll=True,
height=580,
type="messages")
def update_model(model, state):
state["model"] = model
def update_api_key(api_key_value, state):
state["api_key"] = api_key_value
def update_base_url(base_url, state):
state["base_url"] = base_url
def clear_chat(state):
# Reset message-related state
state["messages"] = []
state["chatbox_messages"] = []
state["responses"] = {}
state["tools"] = {}
state["tasks"] = []
return state["chatbox_messages"]
model.change(fn=update_model, inputs=[model, state], outputs=None)
api_key.change(fn=update_api_key, inputs=[api_key, state], outputs=None)
chatbot.clear(fn=clear_chat, inputs=[state], outputs=[chatbot])
vision_agent = VisionAgent(yolo_model_path=os.path.join(OMNI_PARSER_DIR, "icon_detect", "model.pt"))
vision_agent_state = gr.State({"agent": vision_agent})
submit_button.click(process_input, [chat_input, state, vision_agent_state], [chatbot, task_list])
stop_button.click(stop_app, [state], None)
base_url.change(fn=update_base_url, inputs=[base_url, state], outputs=None)
demo.load(
setup_state,
inputs=[state],
outputs=[model, base_url, api_key, chatbot, task_list]
)
demo.launch(server_name="0.0.0.0", quiet=True, server_port=7888, prevent_thread_lock=True)
BLUE = "\033[34m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
RESET = "\033[0m"
print(f"\n\n🚀 Server is running at: {BLUE}{BOLD}{UNDERLINE}http://127.0.0.1:7888{RESET}")
import time
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n<EFBFBD><EFBFBD> closing server")

View File

@@ -1,29 +0,0 @@
import asyncio
from typing import Any, cast
from anthropic.types.beta import (
BetaContentBlock
)
from auto_control.tools import ComputerTool, ToolCollection
class AnthropicExecutor:
def __init__(self):
self.tool_collection = ToolCollection(
ComputerTool()
)
def __call__(self, response, messages):
tool_result_content: list[str] = []
for content_block in cast(list[BetaContentBlock], response.content):
# Execute the tool
if content_block.type == "tool_use":
# Run the asynchronous tool execution in a synchronous context
result = asyncio.run(self.tool_collection.run(
name=content_block.name,
tool_input=cast(dict[str, Any], content_block.input),
))
tool_result_content.append(
str(result)
)
return tool_result_content

View File

@@ -1,136 +0,0 @@
"""
Agentic sampling loop that calls the Anthropic API and local implenmentation of anthropic-defined computer use tools.
"""
import base64
from io import BytesIO
import cv2
from auto_control.agent.vision_agent import VisionAgent
from auto_control.tools.screen_capture import get_screenshot
from anthropic.types.beta import (BetaMessageParam)
from auto_control.agent.task_plan_agent import TaskPlanAgent
from auto_control.agent.task_run_agent import TaskRunAgent
from auto_control.executor.anthropic_executor import AnthropicExecutor
import numpy as np
from PIL import Image
OUTPUT_DIR = "./tmp/outputs"
def sampling_loop_sync(
*,
model: str,
messages: list[BetaMessageParam],
vision_agent: VisionAgent,
screen_region: tuple[int, int, int, int]
):
"""
Synchronous agentic sampling loop for the assistant/tool interaction of computer use.
"""
print('in sampling_loop_sync, model:', model)
task_plan_agent = TaskPlanAgent()
executor = AnthropicExecutor()
task_run_agent = TaskRunAgent()
parsed_screen_result = parsed_screen(vision_agent, screen_region)
task_plan_agent(messages=messages, parsed_screen_result=parsed_screen_result)
yield
while True:
execute_result = execute_task_plan(vision_agent, task_run_agent, executor, messages, screen_region)
if execute_result['next_action'] == 'None':
break
yield
def execute_task_plan(vision_agent, task_run_agent, executor, messages, screen_region):
parsed_screen_result = parsed_screen(vision_agent, screen_region)
tools_use_needed, vlm_response_json = task_run_agent(parsed_screen_result=parsed_screen_result, messages=messages)
executor(tools_use_needed, messages)
return vlm_response_json
def parsed_screen(vision_agent: VisionAgent, screen_region: tuple[int, int, int, int] = None):
screenshot, screenshot_path = get_screenshot(screen_region)
response_json = {}
response_json['parsed_content_list'] = vision_agent(str(screenshot_path))
response_json['width'] = screenshot.size[0]
response_json['height'] = screenshot.size[1]
response_json['image'] = draw_elements(screenshot, response_json['parsed_content_list'])
buffered = BytesIO()
response_json['image'].save(buffered, format="PNG")
response_json['base64_image'] = base64.b64encode(buffered.getvalue()).decode("utf-8")
return response_json
def draw_elements(screenshot, parsed_content_list):
"""
Convert PIL image to OpenCV compatible format and draw bounding boxes
Args:
screenshot: PIL Image object
parsed_content_list: list containing bounding box information
Returns:
PIL image with drawn bounding boxes
"""
# convert PIL image to opencv format
opencv_image = np.array(screenshot)
opencv_image = cv2.cvtColor(opencv_image, cv2.COLOR_RGB2BGR)
# draw bounding boxes
for element in parsed_content_list:
bbox = element.coordinates
x1, y1, x2, y2 = bbox
# convert coordinates to integers
x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
# Generate unique color for each element (using element_id as seed)
def get_distinct_color(element_id):
import hashlib
# Use id to generate unique but consistent color
hash_value = int(hashlib.md5(str(element_id).encode()).hexdigest(), 16)
r = (hash_value & 0xFF0000) >> 16
g = (hash_value & 0x00FF00) >> 8
b = hash_value & 0x0000FF
return (r, g, b)
# Use semi-transparent effect and unique color when drawing rectangle
color = get_distinct_color(element.element_id)
# Draw semi-transparent rectangle (assuming there's original rectangle drawing code)
cv2.rectangle(opencv_image, (x1, y1), (x2, y2), color, 1) # Reduce thickness from 2 to 1
# Calculate the size of the bounding box
box_width = x2 - x1
box_height = y2 - y1
# Dynamically adjust font size based on box size
# Smaller boxes get smaller text
base_font_size = 0.5
min_dimension = min(box_width, box_height)
if min_dimension < 30:
font_size = max(0.3, base_font_size * min_dimension / 30)
else:
font_size = base_font_size
text = str(element.element_id)
(text_width, text_height), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, font_size, 1)
# Position text at the top-left corner with small padding
text_x = x1 + 2
text_y = y1 + text_height + 2
# Create transparent overlay for text background (alpha blending)
overlay = opencv_image.copy()
cv2.rectangle(overlay,
(text_x - 2, text_y - text_height - 2),
(text_x + text_width + 2, text_y + 2),
(0, 0, 0), -1)
# Apply transparency (alpha value: 0.5)
alpha = 0.5
cv2.addWeighted(overlay, alpha, opencv_image, 1 - alpha, 0, opencv_image)
# Place text at the top-left corner of the box
cv2.putText(opencv_image, text,
(text_x, text_y),
cv2.FONT_HERSHEY_SIMPLEX, font_size, color, 1)
# convert opencv image format back to PIL format
opencv_image = cv2.cvtColor(opencv_image, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(opencv_image)
return pil_image

View File

@@ -1,11 +0,0 @@
from .base import ToolResult
from .collection import ToolCollection
from .computer import ComputerTool
from .screen_capture import get_screenshot
__ALL__ = [
ComputerTool,
ToolCollection,
ToolResult,
get_screenshot,
]

View File

@@ -1,65 +0,0 @@
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass, fields, replace
from typing import Any
from anthropic.types.beta import BetaToolUnionParam
class BaseAnthropicTool(metaclass=ABCMeta):
"""Abstract base class for Anthropic-defined tools."""
@abstractmethod
def __call__(self, **kwargs) -> Any:
"""Executes the tool with the given arguments."""
...
@abstractmethod
def to_params(
self,
) -> BetaToolUnionParam:
raise NotImplementedError
@dataclass(kw_only=True, frozen=True)
class ToolResult:
"""Represents the result of a tool execution."""
output: str | None = None
error: str | None = None
base64_image: str | None = None
system: str | None = None
def __bool__(self):
return any(getattr(self, field.name) for field in fields(self))
def __add__(self, other: "ToolResult"):
def combine_fields(
field: str | None, other_field: str | None, concatenate: bool = True
):
if field and other_field:
if concatenate:
return field + other_field
raise ValueError("Cannot combine tool results")
return field or other_field
return ToolResult(
output=combine_fields(self.output, other.output),
error=combine_fields(self.error, other.error),
base64_image=combine_fields(self.base64_image, other.base64_image, False),
system=combine_fields(self.system, other.system),
)
def replace(self, **kwargs):
"""Returns a new ToolResult with the given fields replaced."""
return replace(self, **kwargs)
class ToolFailure(ToolResult):
"""A ToolResult that represents a failure."""
class ToolError(Exception):
"""Raised when a tool encounters an error."""
def __init__(self, message):
self.message = message

View File

@@ -1,34 +0,0 @@
"""Collection classes for managing multiple tools."""
from typing import Any
from anthropic.types.beta import BetaToolUnionParam
from .base import (
BaseAnthropicTool,
ToolError,
ToolFailure,
ToolResult,
)
class ToolCollection:
"""A collection of anthropic-defined tools."""
def __init__(self, *tools: BaseAnthropicTool):
self.tools = tools
self.tool_map = {tool.to_params()["name"]: tool for tool in tools}
def to_params(
self,
) -> list[BetaToolUnionParam]:
return [tool.to_params() for tool in self.tools]
async def run(self, *, name: str, tool_input: dict[str, Any]) -> ToolResult:
tool = self.tool_map.get(name)
if not tool:
return ToolFailure(error=f"Tool {name} is invalid")
try:
return await tool(**tool_input)
except ToolError as e:
return ToolFailure(error=e.message)

View File

@@ -1,200 +0,0 @@
import base64
import time
from typing import Literal, TypedDict
from PIL import Image
from anthropic.types.beta import BetaToolComputerUse20241022Param
from .base import BaseAnthropicTool, ToolError, ToolResult
from .screen_capture import get_screenshot
import pyautogui
import pyperclip
import platform
OUTPUT_DIR = "./tmp/outputs"
TYPING_DELAY_MS = 12
TYPING_GROUP_SIZE = 50
Action = [
"key",
"type",
"mouse_move",
"left_click",
"left_click_drag",
"right_click",
"middle_click",
"double_click",
"cursor_position",
"hover",
"wait",
"scroll_up",
"scroll_down",
"None"
]
class Resolution(TypedDict):
width: int
height: int
MAX_SCALING_TARGETS: dict[str, Resolution] = {
"XGA": Resolution(width=1024, height=768), # 4:3
"WXGA": Resolution(width=1280, height=800), # 16:10
"FWXGA": Resolution(width=1366, height=768), # ~16:9
}
class ComputerToolOptions(TypedDict):
display_height_px: int
display_width_px: int
display_number: int | None
def chunks(s: str, chunk_size: int) -> list[str]:
return [s[i : i + chunk_size] for i in range(0, len(s), chunk_size)]
class ComputerTool(BaseAnthropicTool):
"""
A tool that allows the agent to interact with the screen, keyboard, and mouse of the current computer.
Adapted for Windows using 'pyautogui'.
"""
name: Literal["computer"] = "computer"
api_type: Literal["computer_20241022"] = "computer_20241022"
width: int
height: int
display_num: int | None
_screenshot_delay = 2.0
@property
def options(self) -> ComputerToolOptions:
return {
"display_width_px": self.width,
"display_height_px": self.height,
"display_number": self.display_num,
}
def to_params(self) -> BetaToolComputerUse20241022Param:
return {"name": self.name, "type": self.api_type, **self.options}
def __init__(self):
super().__init__()
self.display_num = None
self.offset_x = 0
self.offset_y = 0
self.width, self.height = pyautogui.size()
self.key_conversion = {"Page_Down": "pagedown",
"Page_Up": "pageup",
"Super_L": "win",
"Escape": "esc"}
async def __call__(
self,
*,
action,
text: str | None = None,
coordinate: tuple[int, int] | None = None,
**kwargs,
):
print(f"action: {action}, text: {text}, coordinate: {coordinate},")
if action in ("mouse_move", "left_click_drag"):
if coordinate is None:
raise ToolError(f"coordinate is required for {action}")
if text is not None:
raise ToolError(f"text is not accepted for {action}")
if not isinstance(coordinate, (list, tuple)) or len(coordinate) != 2:
raise ToolError(f"{coordinate} must be a tuple of length 2")
# if not all(isinstance(i, int) and i >= 0 for i in coordinate):
if not all(isinstance(i, int) for i in coordinate):
raise ToolError(f"{coordinate} must be a tuple of non-negative ints")
x, y = coordinate
print(f"mouse move to {x}, {y}")
if action == "mouse_move":
pyautogui.moveTo(x, y)
return ToolResult(output=f"Moved mouse to ({x}, {y})")
elif action == "left_click_drag":
current_x, current_y = pyautogui.position()
pyautogui.dragTo(x, y, duration=0.5)
return ToolResult(output=f"Dragged mouse from ({current_x}, {current_y}) to ({x}, {y})")
if action in ("key", "type"):
if text is None:
raise ToolError(f"text is required for {action}")
if coordinate is not None:
raise ToolError(f"coordinate is not accepted for {action}")
if not isinstance(text, str):
raise ToolError(output=f"{text} must be a string")
if action == "key":
# Handle key combinations
keys = text.split('+')
for key in keys:
key = self.key_conversion.get(key.strip(), key.strip())
key = key.lower()
pyautogui.keyDown(key)
for key in reversed(keys):
key = self.key_conversion.get(key.strip(), key.strip())
key = key.lower()
pyautogui.keyUp(key)
return ToolResult(output=f"Pressed keys: {text}")
elif action == "type":
# default click before type TODO: check if this is needed
# Save user's old clipboard
clipboard_data = pyperclip.paste()
pyperclip.copy(text)
if platform.system() == 'Darwin':
pyautogui.hotkey('command', 'v', interval=0.1)
else: # TODO: double check what works on windows
pyautogui.hotkey('ctrl', 'v')
# Copy old data back to clipboard
pyperclip.copy(clipboard_data)
return ToolResult(output=text)
if action in (
"left_click",
"right_click",
"double_click",
"middle_click",
"cursor_position",
"left_press",
):
if text is not None:
raise ToolError(f"text is not accepted for {action}")
if coordinate is not None:
raise ToolError(f"coordinate is not accepted for {action}")
elif action == "cursor_position":
x, y = pyautogui.position()
# 直接返回原始坐标,不进行缩放
return ToolResult(output=f"X={x},Y={y}")
else:
if action == "left_click":
pyautogui.click()
elif action == "right_click":
pyautogui.rightClick()
# 等待5秒等待菜单弹出
time.sleep(5)
elif action == "middle_click":
pyautogui.middleClick()
elif action == "double_click":
pyautogui.doubleClick()
elif action == "left_press":
pyautogui.mouseDown()
time.sleep(1)
pyautogui.mouseUp()
return ToolResult(output=f"Performed {action}")
if action in ("scroll_up", "scroll_down"):
if action == "scroll_up":
pyautogui.scroll(100)
elif action == "scroll_down":
pyautogui.scroll(-100)
return ToolResult(output=f"Performed {action}")
if action == "hover":
return ToolResult(output=f"Performed {action}")
if action == "wait":
time.sleep(1)
return ToolResult(output=f"Performed {action}")
raise ToolError(f"Invalid action: {action}")
def padding_image(self, screenshot):
"""Pad the screenshot to 16:10 aspect ratio, when the aspect ratio is not 16:10."""
_, height = screenshot.size
new_width = height * 16 // 10
padding_image = Image.new("RGB", (new_width, height), (255, 255, 255))
# padding to top left
padding_image.paste(screenshot, (0, 0))
return padding_image

View File

@@ -1,41 +0,0 @@
import base64
from io import BytesIO
from pathlib import Path
from uuid import uuid4
from PIL import Image
import pyautogui
from .base import ToolError
from util import tool
OUTPUT_DIR = "./tmp/outputs"
def get_screenshot(screen_region=None, is_cursor=True, is_base64=False):
output_dir = Path(OUTPUT_DIR)
output_dir.mkdir(parents=True, exist_ok=True)
path = output_dir / f"screenshot_{uuid4().hex}.png"
try:
if is_cursor:
img_io = tool.capture_screen_with_cursor()
else:
pyautogui_screenshot = pyautogui.screenshot()
img_io = BytesIO()
pyautogui_screenshot.save(img_io, 'PNG')
screenshot = Image.open(img_io)
# Create a black mask of the same size
# If screen_region is provided and valid, copy only that region
if screen_region and len(screen_region) == 4:
black_mask = Image.new("RGBA", screenshot.size, (0, 0, 0, 255))
x1, y1, x2, y2 = screen_region
region = screenshot.crop((x1, y1, x2, y2))
# Paste the region onto the black mask
black_mask.paste(region, (x1, y1, x2, y2))
# Use the modified image as screenshot
screenshot = black_mask
if is_base64:
screenshot.save(path)
with open(path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8'), path
return screenshot, path
except Exception as e:
raise ToolError(f"Failed to capture screenshot: {str(e)}")

Binary file not shown.

Before

Width:  |  Height:  |  Size: 77 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 33 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 251 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 86 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 8.3 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 4.7 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 7.8 KiB

BIN
imgs/user.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 392 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 46 KiB

BIN
imgs/xiaohong.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 327 KiB

14
main.py
View File

@@ -1,9 +1,7 @@
from ui.main import main
from util import download_weights
def run():
download_weights.download()
main()
if __name__ == "__main__":
run()
"""
Run script for the AutoMate application
"""
from src.main import main
if __name__ == "__main__":
main()

View File

@@ -1,18 +1,7 @@
# torch
# torchvision
# easyocr
supervision==0.18.0
# transformers
ultralytics==8.3.70
numpy==1.26.4
gradio
pyautogui==0.9.54
anthropic[bedrock,vertex]>=0.37.1
pyxbrain==1.1.31
timm
einops==0.8.0
modelscope
pynput
lap
pyqt6==6.8.1
keyboard==0.13.5
pyqt6
pyautogui==0.9.54
pillow==11.1.0
keyboard
mouse

3
src/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""
AutoMate - An application for demonstrating and automating tasks
"""

3
src/assets/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""
Assets (images, icons, etc.) for the AutoMate application
"""

3
src/core/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""
Core functionality for the AutoMate application
"""

View File

@@ -0,0 +1,227 @@
"""
Conversation manager module for handling dialog flow and states
"""
import json
import time
from PyQt6.QtCore import QObject, QThread, QTimer
from src.core.input_listener import InputListener
class ConversationManager(QObject):
"""
Manages conversation state and process user interactions
"""
def __init__(self, chat_area, mini_window):
"""
Initialize the conversation manager
Args:
chat_area: ChatArea widget to display messages
mini_window: MiniWindow for demonstration mode
"""
super().__init__()
self.chat_area = chat_area
self.mini_window = mini_window
# Initialize state
self.conversation_state = "greeting"
self.task_demonstration = []
self.is_recording = False
self.text_buffer = ""
self.last_keypress_time = 0
# Start the conversation
self.start_conversation()
def start_conversation(self):
"""Initialize the conversation with a greeting"""
greeting = "Hello! I'm Xiao Hong, 23 years old, recently graduated from East China Normal University " + \
"with a Computer Science degree. I'm skilled in data analysis and document processing, " + \
"and have a positive and detail-oriented personality. Looking forward to working with you!"
self.chat_area.add_message("Xiao Hong", greeting)
self.chat_area.add_message("System", "Please enter your response...")
def process_message(self, message):
"""
Process incoming user message based on conversation state
Args:
message: Text message from user
"""
# Add user message to chat
self.chat_area.add_message("You", message, True)
# Process message based on current state
if self.conversation_state == "greeting":
self.handle_greeting_response(message)
elif self.conversation_state == "ask_for_demo":
self.handle_demo_request(message)
elif self.conversation_state == "task_demonstration" and self.is_recording:
self.handle_task_demonstration(message)
elif self.conversation_state == "ready":
self.handle_ready_state(message)
def handle_greeting_response(self, message):
"""Handle user's response to the initial greeting"""
response = "Nice to meet you! I heard you want to demonstrate a task for me, " + \
"so I can learn and help you with similar tasks in the future. When would you like to start?"
self.chat_area.add_message("Xiao Hong", response)
self.conversation_state = "ask_for_demo"
def handle_demo_request(self, message):
"""Handle user's response to the demo request"""
if any(keyword in message.lower() for keyword in ["can", "yes", "now", "start", "demo"]):
response = "Great! I'll minimize the window but keep a small control in the corner. " + \
"Click 'Finish Demo' when you're done, and I'll record your steps."
self.chat_area.add_message("Xiao Hong", response)
self.conversation_state = "task_demonstration"
self.is_recording = True
# Delay 1 second before starting recording mode
QTimer.singleShot(1000, self.start_recording_mode)
else:
response = "No problem, just let me know whenever you're ready to demonstrate. I'll be here."
self.chat_area.add_message("Xiao Hong", response)
def analyze_action(self, action):
"""
Analyze user actions during demonstration
Args:
action: Dict containing action data
"""
self.task_demonstration.append(action)
# Initialize status text
status_text = f"Action detected: {action}"
# Format display based on action type
if action["type"] == "mouse":
status_text = f"Mouse action: {action['event']} at position: {action['position']}"
self.text_buffer = ""
elif action["type"] == "keyboard":
current_time = time.time()
# Process keyboard input
key_str = str(action["event"])
# Handle printable characters
if len(key_str) == 3 and key_str.startswith("'") and key_str.endswith("'"):
self.text_buffer += key_str[1]
# Handle special keys
elif "key.space" in key_str.lower():
self.text_buffer += " "
elif "key.enter" in key_str.lower() or "return" in key_str.lower():
status_text = f"Keyboard input completed: \"{self.text_buffer}\""
self.update_mini_window_status(status_text)
self.text_buffer = ""
return
elif "key.backspace" in key_str.lower() and self.text_buffer:
self.text_buffer = self.text_buffer[:-1]
# Display buffer if timeout occurred
if current_time - self.last_keypress_time > 2.0 and self.text_buffer:
status_text = f"Keyboard input: \"{self.text_buffer}\""
else:
status_text = f"Keyboard action: {action['event']} (current input: \"{self.text_buffer}\")"
self.last_keypress_time = current_time
# Update mini window status
self.update_mini_window_status(status_text)
def update_mini_window_status(self, text):
"""
Update the status text in the mini window
Args:
text: Status text to display
"""
if hasattr(self.mini_window, 'status_label'):
self.mini_window.status_label.setText(text)
def start_recording_mode(self):
"""Start recording user interactions"""
# Call to parent window to minimize
if hasattr(self, 'parent'):
self.parent().showMinimized()
# Show mini window
self.mini_window.show()
self.chat_area.add_message("System", "Recording your demonstration...")
# Create input listener
self.keyboard_mouse_listen = InputListener()
self.keyboard_mouse_listen.action_detected.connect(self.analyze_action)
# Set up thread
self.listen_thread = QThread()
self.keyboard_mouse_listen.terminated.connect(self.listen_thread.quit)
self.keyboard_mouse_listen.moveToThread(self.listen_thread)
self.listen_thread.started.connect(self.keyboard_mouse_listen.start_listen)
# Start thread
self.listen_thread.start()
def finish_demonstration(self):
"""Complete the demonstration recording process"""
# Clean up
self.keyboard_mouse_listen.stop_listen()
# Restore main window
if hasattr(self, 'parent'):
self.parent().showNormal()
# Hide mini window
self.mini_window.hide()
self.is_recording = False
self.save_task_demonstration()
# Show summary
action_count = len(self.task_demonstration)
response = f"I've successfully learned this task! Recorded and analyzed {action_count} key actions. " + \
"Feel free to assign similar tasks to me in the future. 😊"
self.chat_area.add_message("Xiao Hong", response)
self.conversation_state = "ready"
def handle_task_demonstration(self, message):
"""
Handle messages during task demonstration
Args:
message: User message
"""
self.task_demonstration.append(message)
if any(keyword in message.lower() for keyword in ["done", "finish", "completed", "complete"]):
self.is_recording = False
self.save_task_demonstration()
response = "I've learned this task! Thank you for the demonstration. " + \
"You can now assign similar tasks to me in the future. 😊"
self.chat_area.add_message("Xiao Hong", response)
self.conversation_state = "ready"
else:
response = "I'm still learning... Please continue your demonstration."
self.chat_area.add_message("Xiao Hong", response)
def handle_ready_state(self, message):
"""
Handle messages in the ready state
Args:
message: User message
"""
response = "How else can I help you? I've learned the task you demonstrated and am ready to assist!"
self.chat_area.add_message("Xiao Hong", response)
def save_task_demonstration(self):
"""Save the recorded task demonstration to a file"""
try:
with open("task_demonstration.json", "w", encoding="utf-8") as f:
json.dump(self.task_demonstration, f, ensure_ascii=False, indent=2)
self.chat_area.add_message("System", "Task demonstration saved successfully")
except Exception as e:
self.chat_area.add_message("System", f"Error saving task demonstration: {str(e)}")

View File

@@ -0,0 +1,81 @@
"""
Input listener module for keyboard and mouse events
"""
from pynput import mouse, keyboard
from PyQt6.QtCore import QObject, pyqtSignal, pyqtSlot
from src.utils.screenshot import get_screenshot
class InputListener(QObject):
"""
Class for listening to keyboard and mouse input events
Emits signals when actions are detected
"""
action_detected = pyqtSignal(dict)
terminated = pyqtSignal()
def __init__(self):
"""Initialize the input listener"""
super().__init__()
self.mouse_listener = None
self.keyboard_listener = None
@pyqtSlot()
def start_listen(self):
"""Start listening for mouse and keyboard events"""
# Create both mouse and keyboard listeners
self.mouse_listener = mouse.Listener(
on_click=self.on_click,
on_scroll=self.on_scroll
)
self.keyboard_listener = keyboard.Listener(
on_release=self.on_release
)
# Start both listeners
self.mouse_listener.start()
self.keyboard_listener.start()
def on_click(self, x, y, button, pressed, injected):
"""
Handle mouse click events
Only emit on release (when pressed is False)
"""
if not pressed:
_, screenshot_path = get_screenshot()
self.action_detected.emit({
"type": "mouse",
"event": button.name + " click",
"position": (x, y),
"screenshot_path": str(screenshot_path)
})
def on_scroll(self, x, y, dx, dy, injected):
"""Handle mouse scroll events"""
_, screenshot_path = get_screenshot()
scroll_direction = 'down' if dy < 0 else 'up'
self.action_detected.emit({
"type": "mouse",
"event": f"scroll {scroll_direction}",
"position": (x, y),
"screenshot_path": str(screenshot_path)
})
def on_release(self, key, injected):
"""Handle keyboard release events"""
_, screenshot_path = get_screenshot()
self.action_detected.emit({
"type": "keyboard",
"event": str(key),
"screenshot_path": str(screenshot_path)
})
def stop_listen(self):
"""Stop all listeners and emit terminated signal"""
if self.mouse_listener:
self.mouse_listener.stop()
if self.keyboard_listener:
self.keyboard_listener.stop()
self.terminated.emit()

23
src/main.py Normal file
View File

@@ -0,0 +1,23 @@
"""
Main entry point for the AutoMate application
"""
import sys
from PyQt6.QtWidgets import QApplication
from src.ui.main_window import MainWindow
def main():
"""
Main application entry point
Creates and runs the AutoMate application
"""
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()

3
src/ui/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""
UI components for the AutoMate application
"""

157
src/ui/chat_area.py Normal file
View File

@@ -0,0 +1,157 @@
"""
Chat area component for displaying message history
"""
from PyQt6.QtWidgets import (QScrollArea, QWidget, QVBoxLayout)
from PyQt6.QtCore import Qt, QTimer
from PyQt6.QtGui import QPainter, QPen, QColor, QPixmap, QFont
import datetime
import os
from src.ui.message_widgets import MessageWidget, SystemMessageWidget
class ChatArea(QScrollArea):
"""
Scrollable chat area for displaying messages
"""
def __init__(self, parent=None):
"""
Initialize the chat area
Args:
parent: Parent widget
"""
super().__init__(parent)
self.setWidgetResizable(True)
self.setHorizontalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff)
# Apply styling
self.setStyleSheet("""
QScrollArea {
border: none;
background-color: white;
}
QScrollBar:vertical {
border: none;
background: #f8f8f8;
width: 8px;
margin: 0px;
}
QScrollBar::handle:vertical {
background: #d0d0d0;
min-height: 30px;
border-radius: 4px;
}
QScrollBar::handle:vertical:hover {
background: #b0b0b0;
}
QScrollBar::add-line:vertical, QScrollBar::sub-line:vertical {
height: 0px;
}
""")
# Create content container
self.content_widget = QWidget()
self.content_widget.setStyleSheet("""
background-color: white;
padding-left: 20px;
padding-right: 20px;
""")
self.content_layout = QVBoxLayout(self.content_widget)
self.content_layout.setContentsMargins(10, 5, 10, 5) # Reduce vertical margins further
self.content_layout.setSpacing(8) # Keep same spacing between messages
self.content_layout.addStretch()
self.setWidget(self.content_widget)
# Create avatar images
self.create_avatars()
def create_avatars(self):
"""Create avatar images for the chat participants"""
# Try to load the intern avatar
avatar_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"imgs", "xiaohong.jpg")
try:
self.intern_avatar = QPixmap(avatar_path)
if self.intern_avatar.isNull():
self.create_fallback_avatar()
else:
self.intern_avatar = self.intern_avatar.scaled(40, 40,
Qt.AspectRatioMode.KeepAspectRatio,
Qt.TransformationMode.SmoothTransformation)
except:
self.create_fallback_avatar()
# Create a user avatar
avatar_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"imgs", "user.png")
original_pixmap = QPixmap(avatar_path)
self.user_avatar = original_pixmap.scaled(40, 40,
Qt.AspectRatioMode.KeepAspectRatio,
Qt.TransformationMode.SmoothTransformation)
# Create circular mask
mask = QPixmap(40, 40)
mask.fill(Qt.GlobalColor.transparent)
painter = QPainter(mask)
painter.setRenderHint(QPainter.RenderHint.Antialiasing)
painter.setBrush(QColor("black"))
painter.setPen(Qt.PenStyle.NoPen)
painter.drawEllipse(0, 0, 40, 40)
painter.end()
# Apply mask to avatar
masked_pixmap = QPixmap(40, 40)
masked_pixmap.fill(Qt.GlobalColor.transparent)
painter = QPainter(masked_pixmap)
painter.setRenderHint(QPainter.RenderHint.Antialiasing)
painter.drawPixmap(0, 0, mask)
painter.setCompositionMode(QPainter.CompositionMode.CompositionMode_SourceIn)
painter.drawPixmap(0, 0, self.user_avatar)
painter.end()
self.user_avatar = masked_pixmap
def create_fallback_avatar(self):
"""Create a fallback avatar when image loading fails"""
self.intern_avatar = QPixmap(40, 40)
self.intern_avatar.fill(Qt.GlobalColor.transparent)
painter = QPainter(self.intern_avatar)
painter.setRenderHint(QPainter.RenderHint.Antialiasing)
painter.setBrush(QColor("#ffebee"))
painter.setPen(QPen(QColor("#f44336"), 2))
painter.drawEllipse(2, 2, 36, 36)
painter.setPen(QPen(QColor("#d32f2f"), 2))
painter.setFont(QFont("Arial", 15, QFont.Weight.Bold))
painter.drawText(14, 26, "小红")
painter.end()
def add_message(self, sender, text, is_user=False):
"""
Add a new message to the chat area
Args:
sender: Message sender name
text: Message content
is_user: Whether this is a user message
"""
timestamp = datetime.datetime.now().strftime("%H:%M")
if sender == "System":
message_widget = SystemMessageWidget(text)
else:
if is_user:
message_widget = MessageWidget("", self.user_avatar, text, timestamp, True)
else:
message_widget = MessageWidget("", self.intern_avatar, text, timestamp, False)
# Insert the message widget above the spacer
self.content_layout.insertWidget(self.content_layout.count() - 1, message_widget)
# Scroll to the bottom to show new message
QTimer.singleShot(100, self.scroll_to_bottom)
def scroll_to_bottom(self):
"""Scroll the chat area to the bottom to show the latest messages"""
self.verticalScrollBar().setValue(self.verticalScrollBar().maximum())

124
src/ui/input_area.py Normal file
View File

@@ -0,0 +1,124 @@
"""
Input area component for user message entry
"""
from PyQt6.QtWidgets import (QWidget, QTextEdit, QPushButton, QHBoxLayout, QVBoxLayout)
from PyQt6.QtCore import Qt, pyqtSignal
from PyQt6.QtGui import QFont, QColor
class InputArea(QWidget):
"""
Input area for user to type and send messages
"""
def __init__(self, message_callback, parent=None):
"""
Initialize input area
Args:
message_callback: Function to call when a message is submitted
parent: Parent widget
"""
super().__init__(parent)
self.message_callback = message_callback
self.init_ui()
def init_ui(self):
"""Initialize the UI components"""
# Main layout
main_layout = QVBoxLayout(self)
main_layout.setContentsMargins(15, 8, 15, 15)
# Input area with send button
input_layout = QHBoxLayout()
input_layout.setSpacing(10)
input_layout.setContentsMargins(0, 0, 0, 0)
# Text edit for input
self.text_edit = QTextEdit()
self.text_edit.setPlaceholderText("Please enter your response...")
self.text_edit.setMinimumHeight(50)
self.text_edit.setMaximumHeight(100)
self.text_edit.setFont(QFont("Arial", 11))
self.text_edit.setStyleSheet("""
QTextEdit {
border: 1px solid #e6e6e6;
border-radius: 18px;
padding: 10px 15px;
background-color: #ffffff;
color: #333333;
}
QTextEdit:focus {
border: 1px solid #cccccc;
}
""")
# Make return key submit the message
self.text_edit.installEventFilter(self)
# Send button
self.send_button = QPushButton("Send")
self.send_button.setFont(QFont("Arial", 11, QFont.Weight.Bold))
self.send_button.setMinimumSize(80, 50)
self.send_button.setCursor(Qt.CursorShape.PointingHandCursor)
self.send_button.setStyleSheet("""
QPushButton {
background-color: #1e88e5;
color: white;
border-radius: 18px;
padding: 8px 16px;
border: none;
}
QPushButton:hover {
background-color: #1976d2;
}
QPushButton:pressed {
background-color: #1565c0;
}
QPushButton:disabled {
background-color: #e0e0e0;
color: #9e9e9e;
}
""")
self.send_button.clicked.connect(self.send_message)
# Add widgets to layout
input_layout.addWidget(self.text_edit)
input_layout.addWidget(self.send_button)
input_layout.setStretchFactor(self.text_edit, 8)
input_layout.setStretchFactor(self.send_button, 1)
main_layout.addLayout(input_layout)
def eventFilter(self, obj, event):
"""
Handle keyboard events in the text edit
Args:
obj: Object that triggered the event
event: The event object
"""
if obj is self.text_edit and event.type() == event.Type.KeyPress:
# Check for Enter key (without Shift for newline)
if event.key() == Qt.Key.Key_Return and not event.modifiers() & Qt.KeyboardModifier.ShiftModifier:
self.send_message()
return True
return super().eventFilter(obj, event)
def send_message(self):
"""Send the current message"""
message = self.text_edit.toPlainText().strip()
if message:
# Call the callback
self.message_callback(message)
# Clear the input
self.text_edit.clear()
def set_enabled(self, enabled):
"""
Enable or disable the input area
Args:
enabled: Whether the input area should be enabled
"""
self.text_edit.setEnabled(enabled)
self.send_button.setEnabled(enabled)

86
src/ui/main_window.py Normal file
View File

@@ -0,0 +1,86 @@
"""
Main application window for the AutoMate interface
"""
from PyQt6.QtWidgets import QMainWindow, QWidget, QHBoxLayout, QVBoxLayout, QApplication
from PyQt6.QtCore import Qt
from src.ui.chat_area import ChatArea
from src.ui.input_area import InputArea
from src.ui.profile_widget import ProfileWidget
from src.ui.mini_window import MiniWindow
from src.core.conversation_manager import ConversationManager
class MainWindow(QMainWindow):
"""
Main application window containing all UI components
"""
def __init__(self):
"""Initialize the main window"""
super().__init__()
self.setWindowTitle("Chat with Xiao Hong")
self.setGeometry(100, 100, 1200, 800)
self.setMinimumSize(900, 600)
self.setStyleSheet("""
QMainWindow {
background-color: white;
}
""")
# Center the window on screen
screen = QApplication.primaryScreen().availableGeometry()
window_size = self.geometry()
x = (screen.width() - window_size.width()) // 2
y = (screen.height() - window_size.height()) // 2
self.move(x, y)
# Create central widget
central_widget = QWidget()
self.setCentralWidget(central_widget)
# Create main layout
main_layout = QHBoxLayout(central_widget)
main_layout.setContentsMargins(0, 0, 0, 0)
main_layout.setSpacing(0)
# Create profile widget
self.profile_widget = ProfileWidget()
# Create chat container
chat_container = QWidget()
chat_layout = QVBoxLayout(chat_container)
chat_layout.setContentsMargins(0, 0, 0, 0)
chat_layout.setSpacing(0)
chat_container.setStyleSheet("""
background-color: white;
""")
# Create chat area
self.chat_area = ChatArea()
# Create mini window for demonstration mode
self.mini_window = MiniWindow(self.finish_demonstration)
# Create conversation manager
self.conversation_manager = ConversationManager(self.chat_area, self.mini_window)
# Set parent for conversation manager
self.conversation_manager.parent = lambda: self
# Connect mini window to chat area avatar
self.mini_window.set_avatar(self.chat_area.intern_avatar)
# Create input area
self.input_area = InputArea(self.conversation_manager.process_message)
# Add to chat layout
chat_layout.addWidget(self.chat_area, 1)
chat_layout.addWidget(self.input_area, 0)
# Add to main layout
main_layout.addWidget(self.profile_widget, 1)
main_layout.addWidget(chat_container, 5)
def finish_demonstration(self):
"""Finish demonstration callback for mini window"""
self.conversation_manager.finish_demonstration()

130
src/ui/message_widgets.py Normal file
View File

@@ -0,0 +1,130 @@
"""
Message widget components for chat interface
"""
from PyQt6.QtWidgets import (QWidget, QLabel, QHBoxLayout, QVBoxLayout)
from PyQt6.QtCore import Qt
from PyQt6.QtGui import QFont, QColor, QPalette
class MessageWidget(QWidget):
"""
Widget for displaying chat messages with avatar, name, text and timestamp
"""
def __init__(self, sender_name, avatar_pixmap, message_text, timestamp, is_user=False):
"""
Initialize a message widget
Args:
sender_name: Name of the message sender
avatar_pixmap: Pixmap for the sender's avatar
message_text: Text content of the message
timestamp: Time the message was sent
is_user: Whether this is a user message (affects styling)
"""
super().__init__()
self.is_user = is_user
self.init_ui(sender_name, avatar_pixmap, message_text, timestamp)
def init_ui(self, sender_name, avatar_pixmap, message_text, timestamp):
"""Initialize the UI components of the message widget"""
# Create main layout
main_layout = QHBoxLayout(self)
main_layout.setContentsMargins(0, 2, 0, 2) # Reduce vertical padding further
main_layout.setSpacing(4) # Reduce spacing between avatar and message
# Add avatar to left or right based on if user message
avatar_label = QLabel()
avatar_label.setPixmap(avatar_pixmap)
avatar_label.setFixedSize(40, 40)
avatar_label.setStyleSheet("""
QLabel {
border-radius: 20px;
background-color: transparent;
min-width: 40px;
min-height: 40px;
}
""")
# Create message content layout
message_container = QWidget()
message_layout = QVBoxLayout(message_container)
message_layout.setContentsMargins(8, 6, 8, 6) # Reduce message container padding
message_layout.setSpacing(2) # Reduce spacing between text and timestamp
# Configure message text
text_label = QLabel(message_text)
text_label.setFont(QFont("Arial", 11))
text_label.setWordWrap(True)
text_label.setMinimumWidth(600) # Set minimum width
text_label.setMaximumWidth(800) # Increase maximum width
text_label.setTextInteractionFlags(Qt.TextInteractionFlag.TextSelectableByMouse)
# Add timestamp
time_label = QLabel(timestamp)
time_label.setFont(QFont("Arial", 8))
time_label.setStyleSheet("color: #888888;")
# Arrange components according to message direction
if self.is_user:
message_container.setStyleSheet("""
background-color: #e8f4ff;
border-radius: 20px;
border-top-right-radius: 6px;
padding: 8px;
color: #2c3e50;
margin: 2px;
""")
time_label.setAlignment(Qt.AlignmentFlag.AlignRight)
message_layout.addWidget(text_label)
message_layout.addWidget(time_label)
main_layout.addStretch()
main_layout.addWidget(message_container)
main_layout.addWidget(avatar_label)
else:
message_container.setStyleSheet("""
background-color: #fff2f2;
border-radius: 20px;
border-top-left-radius: 6px;
padding: 8px;
color: #2c3e50;
margin: 2px;
""")
message_layout.addWidget(text_label)
message_layout.addWidget(time_label)
main_layout.addWidget(avatar_label)
main_layout.addWidget(message_container)
main_layout.addStretch()
class SystemMessageWidget(QWidget):
"""Widget for displaying system messages"""
def __init__(self, message_text):
"""
Initialize a system message widget
Args:
message_text: Text content of the system message
"""
super().__init__()
self.init_ui(message_text)
def init_ui(self, message_text):
"""Initialize the UI components of the system message widget"""
main_layout = QHBoxLayout(self)
main_layout.setContentsMargins(20, 3, 20, 3)
# Create system message label
text_label = QLabel(message_text)
text_label.setFont(QFont("Arial", 10, QFont.Weight.Normal))
text_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
text_label.setMaximumWidth(350) # Make system messages narrower
text_label.setStyleSheet("""
background-color: #f0f0f0;
border-radius: 16px;
padding: 8px 14px;
color: #505050;
""")
main_layout.addStretch()
main_layout.addWidget(text_label)
main_layout.addStretch()

109
src/ui/mini_window.py Normal file
View File

@@ -0,0 +1,109 @@
"""
Mini window component for task demonstration mode
"""
from PyQt6.QtWidgets import (QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QLabel, QPushButton, QApplication)
from PyQt6.QtCore import Qt
from PyQt6.QtGui import QFont
class MiniWindow(QMainWindow):
"""
Small floating window displayed during task demonstration
"""
def __init__(self, finish_callback, parent=None):
"""
Initialize the mini window
Args:
finish_callback: Function to call when demonstration is finished
parent: Parent widget
"""
super().__init__(parent)
self.setWindowTitle("Learning Mode")
self.setFixedSize(250, 150)
# Position in bottom-right corner
desktop = QApplication.primaryScreen().availableGeometry()
self.move(desktop.width() - 270, desktop.height() - 170)
# Set frameless and always-on-top flags
self.setWindowFlags(
Qt.WindowType.FramelessWindowHint |
Qt.WindowType.WindowStaysOnTopHint
)
# Set window style
self.setStyleSheet("""
QMainWindow {
background-color: #fff8f8;
border: 2px solid #ffcdd2;
border-radius: 10px;
}
""")
# Create central widget
mini_central = QWidget()
self.setCentralWidget(mini_central)
# Create layout
mini_layout = QVBoxLayout(mini_central)
# Create header with avatar and title
mini_header = QWidget()
header_layout = QHBoxLayout(mini_header)
self.mini_avatar = QLabel()
# Avatar will be set from the main window
header_layout.addWidget(self.mini_avatar)
mini_title = QLabel("Learning in progress...")
mini_title.setFont(QFont("Arial", 10, QFont.Weight.Bold))
mini_title.setStyleSheet("color: #d32f2f;")
header_layout.addWidget(mini_title)
header_layout.addStretch()
# Status information
self.status_label = QLabel("Recording your actions, please continue demonstration...")
self.status_label.setWordWrap(True)
self.status_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.status_label.setFont(QFont("Arial", 10))
self.status_label.setStyleSheet("color: #333333; margin: 10px;")
# Finish button
finish_button = QPushButton("Finish Demo")
finish_button.setFont(QFont("Arial", 10, QFont.Weight.Bold))
finish_button.setCursor(Qt.CursorShape.PointingHandCursor)
finish_button.setStyleSheet("""
QPushButton {
background-color: #f44336;
color: white;
border-radius: 8px;
padding: 8px;
border: none;
}
QPushButton:hover {
background-color: #ef5350;
}
QPushButton:pressed {
background-color: #d32f2f;
}
""")
finish_button.clicked.connect(finish_callback)
# Add to layout
mini_layout.addWidget(mini_header)
mini_layout.addWidget(self.status_label)
mini_layout.addWidget(finish_button)
def set_avatar(self, avatar_pixmap):
"""
Set the avatar image for the mini window
Args:
avatar_pixmap: QPixmap containing the avatar image
"""
scaled_avatar = avatar_pixmap.scaled(30, 30, Qt.AspectRatioMode.KeepAspectRatio,
Qt.TransformationMode.SmoothTransformation)
self.mini_avatar.setPixmap(scaled_avatar)
self.mini_avatar.setFixedSize(30, 30)

194
src/ui/profile_widget.py Normal file
View File

@@ -0,0 +1,194 @@
"""
Profile widget component for displaying intern information
"""
import os
from PyQt6.QtWidgets import (QWidget, QLabel, QVBoxLayout, QHBoxLayout)
from PyQt6.QtCore import Qt
from PyQt6.QtGui import QFont, QPixmap, QColor, QPainter, QPen
class ProfileWidget(QWidget):
"""
Widget displaying the intern's profile information
"""
def __init__(self, parent=None):
"""
Initialize the profile widget
Args:
parent: Parent widget
"""
super().__init__(parent)
self.setFixedWidth(280)
self.setStyleSheet("""
background-color: white;
""")
self.init_ui()
def init_ui(self):
"""Initialize the UI components"""
# Main layout
main_layout = QVBoxLayout(self)
main_layout.setContentsMargins(20, 30, 20, 20)
main_layout.setSpacing(20)
# Profile header with avatar and name
self.create_profile_header(main_layout)
# Add profile information
self.create_profile_info(main_layout)
# Add spacer
main_layout.addStretch()
def create_profile_header(self, layout):
"""
Create the profile header section
Args:
layout: Layout to add the header widgets to
"""
# Header layout
header_layout = QVBoxLayout()
header_layout.setAlignment(Qt.AlignmentFlag.AlignCenter)
header_layout.setSpacing(12)
# Avatar
avatar_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"imgs", "xiaohong.jpg")
avatar_label = QLabel()
avatar_label.setFixedSize(140, 140)
avatar_label.setStyleSheet("""
border-radius: 70px;
background-color: white;
""")
try:
avatar_pixmap = QPixmap(avatar_path)
if not avatar_pixmap.isNull():
scaled_avatar = avatar_pixmap.scaled(140, 140,
Qt.AspectRatioMode.KeepAspectRatio,
Qt.TransformationMode.SmoothTransformation)
# Create circular mask
mask = QPixmap(140, 140)
mask.fill(Qt.GlobalColor.transparent)
painter = QPainter(mask)
painter.setRenderHint(QPainter.RenderHint.Antialiasing)
painter.setBrush(QColor("black"))
painter.setPen(Qt.PenStyle.NoPen)
painter.drawEllipse(0, 0, 140, 140)
painter.end()
# Apply mask to avatar
masked_pixmap = QPixmap(140, 140)
masked_pixmap.fill(Qt.GlobalColor.transparent)
painter = QPainter(masked_pixmap)
painter.setRenderHint(QPainter.RenderHint.Antialiasing)
painter.drawPixmap(0, 0, mask)
painter.setCompositionMode(QPainter.CompositionMode.CompositionMode_SourceIn)
painter.drawPixmap(0, 0, scaled_avatar)
painter.end()
avatar_label.setPixmap(masked_pixmap)
else:
self.create_fallback_avatar(avatar_label)
except:
self.create_fallback_avatar(avatar_label)
avatar_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
header_layout.addWidget(avatar_label)
# Name
name_label = QLabel("Xiao Hong")
name_label.setFont(QFont("Arial", 18, QFont.Weight.Bold))
name_label.setStyleSheet("color: #333333;")
name_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
header_layout.addWidget(name_label)
# Title
title_label = QLabel("AI Assistant")
title_label.setFont(QFont("Arial", 13))
title_label.setStyleSheet("color: #777777;")
title_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
header_layout.addWidget(title_label)
layout.addLayout(header_layout)
def create_fallback_avatar(self, avatar_label):
"""
Create a fallback avatar when the image cannot be loaded
Args:
avatar_label: QLabel to set the fallback avatar to
"""
avatar_pixmap = QPixmap(140, 140)
avatar_pixmap.fill(Qt.GlobalColor.transparent)
painter = QPainter(avatar_pixmap)
painter.setRenderHint(QPainter.RenderHint.Antialiasing)
painter.setBrush(QColor("#f8f9fa"))
painter.setPen(QPen(QColor("#dee2e6"), 3))
painter.drawEllipse(3, 3, 134, 134)
painter.setPen(QPen(QColor("#6c757d"), 2))
painter.setFont(QFont("Arial", 60, QFont.Weight.Bold))
painter.drawText(38, 90, "XH")
painter.end()
avatar_label.setPixmap(avatar_pixmap)
def create_profile_info(self, layout):
"""
Create the profile information section
Args:
layout: Layout to add the info widgets to
"""
# Info layout
info_layout = QVBoxLayout()
info_layout.setSpacing(14)
info_layout.setContentsMargins(0, 0, 0, 0)
# Info items
info_items = [
("Age", "23"),
("Education", "East China Normal University"),
("Major", "Computer Science"),
("Skills", "Data Analysis, Document Processing"),
("Languages", "Chinese, English")
]
for title, value in info_items:
item_layout = QVBoxLayout()
item_layout.setSpacing(4)
item_layout.setContentsMargins(0, 0, 0, 0)
title_label = QLabel(title)
title_label.setFont(QFont("Arial", 12, QFont.Weight.Bold))
title_label.setStyleSheet("color: #555555;")
value_label = QLabel(value)
value_label.setFont(QFont("Arial", 12))
value_label.setWordWrap(True)
value_label.setStyleSheet("color: #333333;")
item_layout.addWidget(title_label)
item_layout.addWidget(value_label)
info_widget = QWidget()
info_widget.setLayout(item_layout)
info_widget.setStyleSheet("background-color: white;")
info_layout.addWidget(info_widget)
layout.addLayout(info_layout)
def create_status_section(self, layout):
"""
Create the status section - now removed
Args:
layout: Layout to add the status widgets to
"""
# This function is now empty as we're removing the status section
pass

3
src/utils/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""
Utility functions for the AutoMate application
"""

72
src/utils/screenshot.py Normal file
View File

@@ -0,0 +1,72 @@
"""
Screenshot utility module for capturing screen content
"""
from io import BytesIO
import os
from pathlib import Path
from uuid import uuid4
from PIL import Image
import pyautogui
# Output directory for screenshots
OUTPUT_DIR = "./tmp/outputs"
def get_screenshot(screen_region=None, is_cursor=True):
"""
Capture a screenshot with or without cursor
Args:
screen_region: Optional tuple (x1, y1, x2, y2) to capture a specific region
is_cursor: Whether to include the cursor in the screenshot
Returns:
tuple: (screenshot_image, screenshot_path)
"""
output_dir = Path(OUTPUT_DIR)
output_dir.mkdir(parents=True, exist_ok=True)
path = output_dir / f"screenshot_{uuid4().hex}.png"
if is_cursor:
img_io = capture_screen_with_cursor()
else:
pyautogui_screenshot = pyautogui.screenshot()
img_io = BytesIO()
pyautogui_screenshot.save(img_io, 'PNG')
screenshot = Image.open(img_io)
# Apply region mask if specified
if screen_region and len(screen_region) == 4:
black_mask = Image.new("RGBA", screenshot.size, (0, 0, 0, 255))
x1, y1, x2, y2 = screen_region
region = screenshot.crop((x1, y1, x2, y2))
# Paste the region onto the black mask
black_mask.paste(region, (x1, y1, x2, y2))
# Use the modified image as screenshot
screenshot = black_mask
screenshot.save(path)
return screenshot, path
def capture_screen_with_cursor():
"""
Capture the screen with cursor overlay
Returns:
BytesIO: Image buffer containing the screenshot with cursor
"""
cursor_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"imgs", "cursor.png")
screenshot = pyautogui.screenshot()
cursor_x, cursor_y = pyautogui.position()
cursor = Image.open(cursor_path)
cursor = cursor.resize((int(cursor.width / 1.5), int(cursor.height / 1.5)))
screenshot.paste(cursor, (cursor_x, cursor_y), cursor)
img_io = BytesIO()
screenshot.save(img_io, 'PNG')
img_io.seek(0)
return img_io

110
task_demonstration.json Normal file
View File

@@ -0,0 +1,110 @@
[
{
"type": "mouse",
"event": "left click",
"position": [
1184,
1025
],
"screenshot_path": "tmp\\outputs\\screenshot_1d542843e6e745199a36fa367995a7be.png"
},
{
"type": "mouse",
"event": "left click",
"position": [
1188,
711
],
"screenshot_path": "tmp\\outputs\\screenshot_82bc33a76fda43c5b1faec1ff0dffe60.png"
},
{
"type": "mouse",
"event": "left click",
"position": [
1324,
577
],
"screenshot_path": "tmp\\outputs\\screenshot_7f8b51c9937e46e3a6e829e3426c2aab.png"
},
{
"type": "mouse",
"event": "left click",
"position": [
1402,
467
],
"screenshot_path": "tmp\\outputs\\screenshot_5b3e8d35a309483d9979fd1cfd991af1.png"
},
{
"type": "mouse",
"event": "left click",
"position": [
1457,
289
],
"screenshot_path": "tmp\\outputs\\screenshot_44de70ef74234ee082139da58d0512d2.png"
},
{
"type": "mouse",
"event": "left click",
"position": [
1444,
396
],
"screenshot_path": "tmp\\outputs\\screenshot_8364d28720c54f6cb4abf34c0b16ebc1.png"
},
{
"type": "mouse",
"event": "left click",
"position": [
1201,
385
],
"screenshot_path": "tmp\\outputs\\screenshot_b67f8a493fc144ceb656c8aad3d368b0.png"
},
{
"type": "mouse",
"event": "left click",
"position": [
1052,
344
],
"screenshot_path": "tmp\\outputs\\screenshot_c657989d97d94e54b5173f911eeacf29.png"
},
{
"type": "mouse",
"event": "right click",
"position": [
1007,
345
],
"screenshot_path": "tmp\\outputs\\screenshot_d80d3f85d51f41cc9ae4bf573a14106d.png"
},
{
"type": "mouse",
"event": "left click",
"position": [
979,
453
],
"screenshot_path": "tmp\\outputs\\screenshot_458eca72c66f4fb8bb63a2b61897c209.png"
},
{
"type": "mouse",
"event": "left click",
"position": [
1137,
570
],
"screenshot_path": "tmp\\outputs\\screenshot_bd5a272513864f6f82b664ecf63084ac.png"
},
{
"type": "mouse",
"event": "left click",
"position": [
1947,
1250
],
"screenshot_path": "tmp\\outputs\\screenshot_db51b11fe77b4819a34876117fbc85b3.png"
}
]

View File

@@ -1,3 +0,0 @@
"""
autoMate UI package
"""

View File

@@ -1,190 +0,0 @@
"""
Worker thread for handling agent operations
"""
import json
from PyQt6.QtCore import QThread, pyqtSignal
from auto_control.loop import sampling_loop_sync
from xbrain.utils.config import Config
class AgentWorker(QThread):
"""Worker thread for running agent operations asynchronously"""
update_signal = pyqtSignal(list, list)
status_signal = pyqtSignal(str) # Signal for status updates
task_signal = pyqtSignal(str) # Signal for current task
error_signal = pyqtSignal(str) # Error signal
def __init__(self, user_input, state, vision_agent):
super().__init__()
self.user_input = user_input
self.state = state
self.vision_agent = vision_agent
def run(self):
# Reset stop flag
if self.state["stop"]:
self.state["stop"] = False
# Configure API
config = Config()
config.set_openai_config(
base_url=self.state["base_url"],
api_key=self.state["api_key"],
model=self.state["model"]
)
# Add user message
self.state["messages"].append({"role": "user", "content": self.user_input})
self.state["chatbox_messages"].append({"role": "user", "content": self.user_input})
# Send initial update
self.update_signal.emit(self.state["chatbox_messages"], [])
self.status_signal.emit("Starting analysis...")
try:
# Process with agent
loop_iterator = sampling_loop_sync(
model=self.state["model"],
messages=self.state["messages"],
vision_agent=self.vision_agent,
screen_region=self.state.get("screen_region", None)
)
for _ in loop_iterator:
# 首先检查停止标志,如果停止则立即退出循环
if self.state["stop"]:
# 添加停止消息
self.state["chatbox_messages"].append({"role": "assistant", "content": "<span style='color:red'>⚠️ 操作已被用户停止</span>"})
self.status_signal.emit("操作已被用户停止")
# 更新UI
self.update_signal.emit(self.state["chatbox_messages"],
[[task["status"], task["task"]] for task in self.state["tasks"]])
# 立即返回,不再继续处理
return
# task_plan_agent first response
if len(self.state["messages"]) == 2:
task_list = json.loads(self.state["messages"][-1]["content"])["task_list"]
for task in task_list:
self.state["tasks"].append({
"status": "",
"task": task
})
else:
# Reset all task statuses
for i in range(len(self.state["tasks"])):
self.state["tasks"][i]["status"] = ""
# Update task progress
content_json = json.loads(self.state["messages"][-1]["content"])
task_completed_number = content_json["current_task_id"]
# Update status with reasoning
if "reasoning" in content_json:
self.status_signal.emit(content_json["reasoning"])
# Update current task
if task_completed_number < len(self.state["tasks"]):
current_task = self.state["tasks"][task_completed_number]["task"]
self.task_signal.emit(current_task)
if task_completed_number > len(self.state["tasks"]) + 1:
for i in range(len(self.state["tasks"])):
self.state["tasks"][i]["status"] = ""
else:
for i in range(task_completed_number + 1):
self.state["tasks"][i]["status"] = ""
# Check stop flag again
if self.state["stop"]:
self.state["chatbox_messages"].append({"role": "assistant", "content": "<span style='color:red'>⚠️ Operation stopped by user</span>"})
self.status_signal.emit("Operation stopped by user")
self.update_signal.emit(self.state["chatbox_messages"],
[[task["status"], task["task"]] for task in self.state["tasks"]])
return
# Reconstruct chat messages from original messages
self.state["chatbox_messages"] = []
for message in self.state["messages"]:
formatted_content, json_reasoning = self.format_message_content(message["content"])
# Add json reasoning as a separate message if exists
if json_reasoning:
self.state["chatbox_messages"].append({
"role": message["role"],
"content": json_reasoning
})
# Add formatted content
self.state["chatbox_messages"].append({
"role": message["role"],
"content": formatted_content
})
# Convert data format before returning results
tasks_2d = [[task["status"], task["task"]] for task in self.state["tasks"]]
self.update_signal.emit(self.state["chatbox_messages"], tasks_2d)
# All done
self.status_signal.emit("Task completed")
except Exception as e:
# Send error signal
import traceback
error_message = f"Error occurred: {str(e)}\n{traceback.format_exc()}"
print(error_message)
# Add error message to chat
self.state["chatbox_messages"].append({
"role": "assistant",
"content": f"<span style='color:red'>⚠️ Network connection error: {str(e)}</span><br>Please check your network connection and API settings, or try again later."
})
self.update_signal.emit(self.state["chatbox_messages"],
[[task["status"], task["task"]] for task in self.state["tasks"]])
self.error_signal.emit(str(e))
self.status_signal.emit(f"Error: {str(e)}")
def format_message_content(self, content):
"""Format message content for display"""
# Handle list-type content (multimodal)
if isinstance(content, list):
formatted_content = ""
json_reasoning = None
for item in content:
if item["type"] == "image_url":
# Changed image style to be smaller
formatted_content += f'<br/><img style="width: 50%; max-width: 400px;" src="{item["image_url"]["url"]}">'
elif item["type"] == "text":
if self.is_json_format(item["text"]):
reasoning, details = self.format_json_content(item["text"])
json_reasoning = reasoning
formatted_content += details
else:
formatted_content += item["text"]
return formatted_content, json_reasoning
# Handle string content
if self.is_json_format(content):
reasoning, _ = self.format_json_content(content)
formatted_content = json.dumps(json.loads(content), indent=4, ensure_ascii=False)
return formatted_content, reasoning
return content, None
def format_json_content(self, json_content):
"""Format JSON content with reasoning and details"""
content_json = json.loads(json_content)
reasoning = f'<h3>{content_json["reasoning"]}</h3>'
details = f'<br/> <details> <summary>Detail</summary> <pre>{json.dumps(content_json, indent=4, ensure_ascii=False)}</pre> </details>'
return reasoning, details
def is_json_format(self, text):
try:
json.loads(text)
return True
except:
return False

View File

@@ -1,69 +0,0 @@
"""
Chat panel for autoMate
"""
from PyQt6.QtWidgets import QWidget, QVBoxLayout, QLabel, QTextEdit
from PyQt6.QtGui import QTextCursor, QTextCharFormat, QColor
class ChatPanel(QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.setup_ui()
def setup_ui(self):
"""Initialize chat panel UI"""
chat_layout = QVBoxLayout(self)
chat_label = QLabel("Chat History")
self.chat_display = QTextEdit()
self.chat_display.setReadOnly(True)
chat_layout.addWidget(chat_label)
chat_layout.addWidget(self.chat_display)
def update_chat(self, chatbox_messages):
"""Update chat display with new messages"""
self.chat_display.clear()
for msg in chatbox_messages:
role = msg["role"]
content = msg["content"]
# Set different formats based on role
format = QTextCharFormat()
if role == "user":
format.setForeground(QColor(0, 0, 255)) # Blue for user
self.chat_display.append("You:")
else:
format.setForeground(QColor(0, 128, 0)) # Green for AI
self.chat_display.append("AI:")
# Add content
cursor = self.chat_display.textCursor()
cursor.movePosition(QTextCursor.MoveOperation.End)
# Special handling for HTML content
if "<" in content and ">" in content:
self.chat_display.insertHtml(content)
self.chat_display.append("") # Add empty line
else:
self.chat_display.append(content)
self.chat_display.append("") # Add empty line
# Scroll to bottom
self.chat_display.verticalScrollBar().setValue(
self.chat_display.verticalScrollBar().maximum()
)
def append_message(self, message, color=None):
"""Append a single message to chat display"""
if color:
self.chat_display.append(f"<span style='color:{color}'>{message}</span>")
else:
self.chat_display.append(message)
# Scroll to bottom
self.chat_display.verticalScrollBar().setValue(
self.chat_display.verticalScrollBar().maximum()
)
def clear(self):
"""Clear chat history"""
self.chat_display.clear()

View File

@@ -1,48 +0,0 @@
"""
Demonstration panel for autoMate
"""
from PyQt6.QtWidgets import QWidget, QHBoxLayout, QLabel, QPushButton, QApplication
from PyQt6.QtCore import Qt, QPoint
class DemonstrationPanel(QWidget):
def __init__(self, parent=None, stop_callback=None):
super().__init__(parent, Qt.WindowType.WindowStaysOnTopHint | Qt.WindowType.FramelessWindowHint)
self.stop_callback = stop_callback
self.setup_ui()
self.position_to_bottom_right()
def setup_ui(self):
demo_layout = QHBoxLayout()
self.setLayout(demo_layout)
# autoMate logo
logo_label = QLabel("autoMate recording...")
logo_label.setStyleSheet("color: #4CAF50; font-weight: bold; font-size: 14px;")
demo_layout.addWidget(logo_label)
# 停止按钮
stop_demo_button = QPushButton("Stop")
stop_demo_button.setStyleSheet("background-color: #ff0000; color: white;")
stop_demo_button.clicked.connect(self.on_stop_clicked)
demo_layout.addWidget(stop_demo_button)
demo_layout.addStretch()
# 设置窗口样式
self.setStyleSheet("background-color: #f0f0f0; border: 1px solid #999; padding: 8px;")
self.setFixedHeight(50) # 固定高度使其更紧凑
self.resize(250, 50)
def position_to_bottom_right(self):
screen = QApplication.primaryScreen()
screen_geometry = screen.availableGeometry()
window_geometry = self.frameGeometry()
position = QPoint(
screen_geometry.width() - window_geometry.width() - 20,
screen_geometry.height() - window_geometry.height() - 20
)
self.move(position)
def on_stop_clicked(self):
if self.stop_callback:
self.stop_callback()

View File

@@ -1,90 +0,0 @@
"""
Hotkey editing widget
"""
import keyboard
from PyQt6.QtWidgets import QWidget, QHBoxLayout, QLineEdit, QPushButton
# Default stop hotkey
DEFAULT_STOP_HOTKEY = "alt+f3"
class HotkeyEdit(QWidget):
"""Widget for recording hotkey combinations"""
def __init__(self, hotkey="", parent=None):
super().__init__(parent)
layout = QHBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
self.hotkey_input = QLineEdit(hotkey)
self.hotkey_input.setReadOnly(True)
self.hotkey_input.setPlaceholderText("Click to record hotkey")
self.record_btn = QPushButton("Record")
self.record_btn.clicked.connect(self.start_recording)
layout.addWidget(self.hotkey_input, 1)
layout.addWidget(self.record_btn)
self.recording = False
self.keys_pressed = set()
def start_recording(self):
"""Start recording a new hotkey"""
if self.recording:
self.stop_recording()
return
self.hotkey_input.setText("Press keys...")
self.record_btn.setText("Stop")
self.recording = True
self.keys_pressed = set()
# Hook global events
keyboard.hook(self.on_key_event)
def stop_recording(self):
"""Stop recording and set the hotkey"""
keyboard.unhook(self.on_key_event)
self.recording = False
self.record_btn.setText("Record")
# Convert keys to hotkey string
if self.keys_pressed:
hotkey = '+'.join(sorted(self.keys_pressed))
self.hotkey_input.setText(hotkey)
else:
self.hotkey_input.setText("")
def on_key_event(self, event):
"""Handle key events during recording"""
if not self.recording:
return
# Skip key up events
if not event.event_type == keyboard.KEY_DOWN:
return
# Get key name
key_name = event.name.lower()
# Special handling for modifier keys
if key_name in ['ctrl', 'alt', 'shift', 'windows']:
self.keys_pressed.add(key_name)
else:
self.keys_pressed.add(key_name)
# Show current keys
self.hotkey_input.setText('+'.join(sorted(self.keys_pressed)))
# Stop recording if user presses Escape alone
if len(self.keys_pressed) == 1 and 'esc' in self.keys_pressed:
self.keys_pressed.clear()
self.stop_recording()
def get_hotkey(self):
"""Get the current hotkey string"""
return self.hotkey_input.text()
def set_hotkey(self, hotkey):
"""Set the hotkey string"""
self.hotkey_input.setText(hotkey)

View File

@@ -1,25 +0,0 @@
"""
Main entry point for autoMate application
"""
import sys
import argparse
from PyQt6.QtWidgets import QApplication
from ui.main_window import MainWindow
def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description="PyQt6 App")
parser.add_argument("--windows_host_url", type=str, default='localhost:8006')
parser.add_argument("--omniparser_server_url", type=str, default="localhost:8000")
return parser.parse_args()
def main():
"""Main application entry point"""
args = parse_arguments()
app = QApplication(sys.argv)
window = MainWindow(args)
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()

View File

@@ -1,375 +0,0 @@
"""
Main application window
"""
import os
import sys
import keyboard
from pathlib import Path
from PyQt6.QtWidgets import (QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QLabel, QLineEdit, QPushButton, QSplitter, QMessageBox,
QDialog, QSystemTrayIcon, QApplication)
from PyQt6.QtCore import Qt, pyqtSlot, QSize, QMetaObject, Q_ARG, Qt, QObject, pyqtSignal
from PyQt6.QtGui import QPixmap, QIcon, QKeySequence, QShortcut
from auto_control.agent.vision_agent import VisionAgent
from util.download_weights import OMNI_PARSER_DIR
from ui.theme import apply_theme
from ui.settings_dialog import SettingsDialog
from ui.agent_worker import AgentWorker
from ui.tray_icon import StatusTrayIcon
from ui.hotkey_edit import DEFAULT_STOP_HOTKEY
from ui.task_panel import TaskPanel
from ui.chat_panel import ChatPanel
from ui.recording_manager import RecordingManager
from ui.settings_manager import SettingsManager
# Intro text for application
INTRO_TEXT = '''
Based on Omniparser to control desktop!
'''
class MainWindow(QMainWindow):
"""Main application window"""
# 添加一个信号用于安全地在主线程调用stop_process
stop_signal = pyqtSignal()
def __init__(self, args):
super().__init__()
self.args = args
# 连接信号到槽
self.stop_signal.connect(self._stop_process_main_thread)
# Initialize settings manager
self.settings_manager = SettingsManager()
# Initialize state
self.state = self.setup_initial_state()
# Initialize Agent
self.vision_agent = VisionAgent(
yolo_model_path=os.path.join(OMNI_PARSER_DIR, "icon_detect", "model.pt")
)
# Initialize recording manager
self.recording_manager = RecordingManager(self)
# Setup UI and tray icon
self.setup_tray_icon()
self.setWindowTitle("autoMate")
self.setMinimumSize(1200, 800)
self.init_ui()
self.apply_theme()
# Register hotkey handler
self.hotkey_handler = None
self.register_stop_hotkey()
def setup_tray_icon(self):
"""Setup system tray icon"""
try:
script_dir = Path(__file__).parent
image_path = script_dir.parent / "imgs" / "logo.png"
pixmap = QPixmap(str(image_path))
icon_pixmap = pixmap.scaled(32, 32, Qt.AspectRatioMode.KeepAspectRatio, Qt.TransformationMode.SmoothTransformation)
app_icon = QIcon(icon_pixmap)
self.setWindowIcon(app_icon)
self.tray_icon = StatusTrayIcon(app_icon, self)
self.tray_icon.show()
except Exception as e:
print(f"Error setting up tray icon: {e}")
self.tray_icon = None
def setup_initial_state(self):
"""Set up initial state"""
# Get settings from settings manager
settings = self.settings_manager.get_settings()
# Create state dictionary with settings and chat state
state = {
# Apply settings
**settings,
# Chat state
"messages": [],
"chatbox_messages": [],
"auth_validated": False,
"responses": {},
"tools": {},
"tasks": [],
"stop": False
}
return state
def register_stop_hotkey(self):
"""Register the global stop hotkey"""
# Clean up existing hotkeys
if self.hotkey_handler:
try:
keyboard.unhook(self.hotkey_handler)
self.hotkey_handler = None
except:
pass
try:
keyboard.unhook_all_hotkeys()
except:
pass
# Get the current hotkey from state
hotkey = self.state.get("stop_hotkey", DEFAULT_STOP_HOTKEY)
if not hotkey:
return
try:
# 修改热键回调,改为发送信号
self.hotkey_handler = keyboard.add_hotkey(hotkey, self._emit_stop_signal, suppress=False)
print(f"Registered stop hotkey: {hotkey}")
except Exception as e:
print(f"Error registering hotkey '{hotkey}': {e}")
try:
keyboard.unhook_all()
# 修改热键回调,改为发送信号
self.hotkey_handler = keyboard.add_hotkey(hotkey, self._emit_stop_signal, suppress=False)
print(f"Registered stop hotkey (alternate method): {hotkey}")
except Exception as e2:
print(f"All attempts to register hotkey '{hotkey}' failed: {e2}")
def _emit_stop_signal(self):
"""从热键回调中安全地发送停止信号"""
self.stop_signal.emit()
def _stop_process_main_thread(self):
"""在主线程中安全地执行停止处理"""
self.state["stop"] = True
# 停止 worker
if hasattr(self, 'worker') and self.worker is not None:
self.worker.terminate()
# 停止录制/监听线程
if hasattr(self, 'recording_manager') and hasattr(self.recording_manager, 'listen_thread'):
if self.recording_manager.listen_thread is not None and self.recording_manager.listen_thread.isRunning():
# 停止监听线程
self.recording_manager.listen_thread.requestInterruption()
self.recording_manager.listen_thread.wait(1000) # 等待最多1秒
if self.recording_manager.listen_thread.isRunning():
self.recording_manager.listen_thread.terminate() # 强制终止
# 清理相关状态
self.recording_manager.listen_thread = None
self.chat_panel.append_message("📝 录制已停止", "blue")
# 其他现有的停止处理代码...
if self.isMinimized():
self.showNormal()
self.activateWindow()
self.chat_panel.append_message("⚠️ Stopped by user", "red")
# Use non-modal dialog
learn_dialog = QMessageBox(self)
learn_dialog.setIcon(QMessageBox.Icon.Question)
learn_dialog.setWindowTitle("Learning Opportunity")
learn_dialog.setText("Would you like to show the correct steps to improve the system?")
learn_dialog.setStandardButtons(QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No)
learn_dialog.setDefaultButton(QMessageBox.StandardButton.No)
learn_dialog.setWindowModality(Qt.WindowModality.NonModal)
learn_dialog.show()
# Connect signal to callback function
learn_dialog.buttonClicked.connect(self.handle_learn_dialog_response)
def apply_theme(self):
"""Apply the current theme to the application"""
apply_theme(self, self.state.get("theme", "Light"))
def init_ui(self):
"""Initialize UI components"""
central_widget = QWidget()
main_layout = QVBoxLayout(central_widget)
# Load top image
header_layout = QVBoxLayout()
try:
script_dir = Path(__file__).parent
image_path = script_dir.parent.parent / "imgs" / "header_bar_thin.png"
if image_path.exists():
pixmap = QPixmap(str(image_path))
header_label = QLabel()
header_label.setPixmap(pixmap.scaledToWidth(self.width()))
header_layout.addWidget(header_label)
except Exception as e:
print(f"Failed to load header image: {e}")
title_label = QLabel("autoMate")
title_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
font = title_label.font()
font.setPointSize(20)
title_label.setFont(font)
header_layout.addWidget(title_label)
# Introduction text
intro_label = QLabel(INTRO_TEXT)
intro_label.setWordWrap(True)
font = intro_label.font()
font.setPointSize(12)
intro_label.setFont(font)
# Settings button and clear chat button (at top)
top_buttons_layout = QHBoxLayout()
self.settings_button = QPushButton("Settings")
self.settings_button.clicked.connect(self.open_settings_dialog)
self.clear_button = QPushButton("Clear Chat")
self.clear_button.clicked.connect(self.clear_chat)
top_buttons_layout.addWidget(self.settings_button)
top_buttons_layout.addWidget(self.clear_button)
top_buttons_layout.addStretch() # Add elastic space to left-align buttons
# Input area
input_layout = QHBoxLayout()
self.chat_input = QLineEdit()
self.chat_input.setPlaceholderText("Type a message to send to Omniparser + X ...")
# Send message on Enter key
self.chat_input.returnPressed.connect(self.process_input)
self.submit_button = QPushButton("Send")
self.submit_button.clicked.connect(self.process_input)
self.stop_button = QPushButton("Stop")
self.stop_button.clicked.connect(self.stop_process)
input_layout.addWidget(self.chat_input, 8)
input_layout.addWidget(self.submit_button, 1)
input_layout.addWidget(self.stop_button, 1)
# Main content area
content_splitter = QSplitter(Qt.Orientation.Horizontal)
# Task panel
self.task_panel = TaskPanel()
# Chat panel
self.chat_panel = ChatPanel()
# Add to splitter
content_splitter.addWidget(self.task_panel)
content_splitter.addWidget(self.chat_panel)
content_splitter.setSizes([int(self.width() * 0.2), int(self.width() * 0.8)])
# Add all components to main layout
main_layout.addLayout(header_layout)
main_layout.addWidget(intro_label)
main_layout.addLayout(top_buttons_layout) # Add top button area
main_layout.addLayout(input_layout)
main_layout.addWidget(content_splitter, 1) # 1 is the stretch factor
self.setCentralWidget(central_widget)
def open_settings_dialog(self):
"""Open settings dialog"""
dialog = SettingsDialog(self, self.state)
result = dialog.exec()
if result == QDialog.DialogCode.Accepted:
# Get and apply new settings
new_settings = dialog.get_settings()
# Update settings in settings manager
changes = self.settings_manager.update_settings(new_settings)
# Update state with new settings
self.state.update(new_settings)
# Apply theme change if needed
if changes["theme_changed"]:
self.apply_theme()
# Update hotkey if changed
if changes["hotkey_changed"]:
self.register_stop_hotkey()
# Save settings to config
self.settings_manager.save_to_config()
def process_input(self):
"""Process user input"""
user_input = self.chat_input.text()
if not user_input.strip():
return
# Clear input box
self.chat_input.clear()
# Minimize main window
self.showMinimized()
# Create and start worker thread
self.worker = AgentWorker(user_input, self.state, self.vision_agent)
self.worker.update_signal.connect(self.update_ui)
self.worker.error_signal.connect(self.handle_error)
# Connect signals to tray icon if available
if hasattr(self, 'tray_icon') and self.tray_icon is not None:
self.worker.status_signal.connect(self.tray_icon.update_status)
self.worker.task_signal.connect(self.tray_icon.update_task)
self.worker.start()
def handle_error(self, error_message):
"""Handle error messages"""
# Restore main window to show the error
self.showNormal()
self.activateWindow()
# Show error message
QMessageBox.warning(self, "Connection Error",
f"Error connecting to AI service:\n{error_message}\n\nPlease check your network connection and API settings.")
@pyqtSlot(list, list)
def update_ui(self, chatbox_messages, tasks):
"""Update UI display"""
# Update chat display
self.chat_panel.update_chat(chatbox_messages)
# Update task table
self.task_panel.update_tasks(tasks)
def stop_process(self):
"""Stop processing - 处理按钮点击"""
# 直接调用主线程处理方法,因为按钮点击已经在主线程中
self._stop_process_main_thread()
def handle_learn_dialog_response(self, button):
if button.text() == "&Yes":
self.showMinimized()
self.recording_manager.start_demonstration()
# Update chat to show demonstration mode is active
self.chat_panel.append_message("📝 Demonstration mode activated. Please perform the correct actions.", "green")
def clear_chat(self):
"""Clear chat history"""
self.state["messages"] = []
self.state["chatbox_messages"] = []
self.state["responses"] = {}
self.state["tools"] = {}
self.state["tasks"] = []
self.chat_panel.clear()
self.task_panel.clear()
def closeEvent(self, event):
keyboard.unhook_all()
event.accept()
if hasattr(self, 'worker') and self.worker is not None:
self.worker.terminate()
# 应用程序入口
def main():
app = QApplication(sys.argv)
window = MainWindow(sys.argv)
window.show()
sys.exit(app.exec()) # 注意PyQt6中不需要括号
if __name__ == "__main__":
main()

View File

@@ -1,97 +0,0 @@
"""
Recording manager for autoMate
Handles recording and demonstration functionality
"""
import yaml
from auto_control.agent.few_shot_generate_agent import FewShotGenerateAgent
from util.auto_control import AutoControl
from ui.demonstration_panel import DemonstrationPanel
from PyQt6.QtCore import QThread, pyqtSignal
import time
import os
class ActionListenThread(QThread):
finished_signal = pyqtSignal()
def __init__(self, action_listen):
super().__init__()
self.action_listen = action_listen
def run(self):
try:
# start listen
self.action_listen.start_listen()
# wait for interruption request
while not self.isInterruptionRequested():
time.sleep(0.1)
except Exception as e:
print(f"Action listening error: {e}")
finally:
# stop listen and clean up resources
try:
self.action_listen.stop_listen()
self.finished_signal.emit()
except Exception as e:
print(f"Cleanup error: {e}")
class RecordingManager:
def __init__(self, parent=None):
self.parent = parent
self.recording_in_progress = False
self.recording_indicator = None
self.demo_panel = None
self.demonstration_mode = False
self.action_listen = AutoControl()
def start_demonstration(self):
"""Start demonstration mode for system learning"""
# Set demonstration mode flag
self.demonstration_mode = True
# hide main window
if self.parent:
self.parent.showMinimized()
# create and show independent demonstration control panel
self.demo_panel = DemonstrationPanel(stop_callback=self.stop_demonstration)
self.demo_panel.show()
# create and start listen thread
self.listen_thread = ActionListenThread(self.action_listen)
self.listen_thread.finished_signal.connect(self.process_recorded_actions)
self.listen_thread.start()
def stop_demonstration(self):
"""Stop demonstration mode and process the recorded actions"""
# stop listening to user actions
self.listen_thread.requestInterruption()
# close independent demonstration control panel
if self.demo_panel:
self.demo_panel.close()
self.demo_panel = None
# restore main window
if self.parent:
self.parent.showNormal()
# Reset state
self.demonstration_mode = False
def process_recorded_actions(self):
"""process all recorded actions"""
# get all collected actions
recorded_actions = self.action_listen.auto_list
few_shot_generate_agent = FewShotGenerateAgent()
few_shot = few_shot_generate_agent(recorded_actions)
# Save few shot examples to ~/.automate directory
# Create .automate directory if not exists
automate_dir = os.path.expanduser("~/.automate")
if not os.path.exists(automate_dir):
os.makedirs(automate_dir)
# Save few shot examples
few_shot_path = os.path.join(automate_dir, "few_shot.yaml")
with open(few_shot_path, "w", encoding="utf-8") as f:
yaml.dump(few_shot, f, allow_unicode=True)
print(f"Few shot examples saved to {few_shot_path}")

View File

@@ -1,125 +0,0 @@
"""
Settings dialog for application configuration
"""
from PyQt6.QtWidgets import (QDialog, QVBoxLayout, QHBoxLayout,
QLabel, QLineEdit, QPushButton, QComboBox)
from PyQt6.QtCore import QTimer
from ui.hotkey_edit import HotkeyEdit, DEFAULT_STOP_HOTKEY
from ui.theme import THEMES
class SettingsDialog(QDialog):
"""Dialog for application settings"""
def __init__(self, parent=None, state=None):
super().__init__(parent)
self.state = state
self.parent_window = parent
self.setWindowTitle("Settings")
self.setMinimumWidth(500)
self.init_ui()
def init_ui(self):
layout = QVBoxLayout(self)
# Model settings
model_layout = QHBoxLayout()
model_label = QLabel("Model:")
self.model_input = QLineEdit(self.state["model"])
model_layout.addWidget(model_label)
model_layout.addWidget(self.model_input)
# Base URL settings
url_layout = QHBoxLayout()
url_label = QLabel("Base URL:")
self.base_url_input = QLineEdit(self.state["base_url"])
url_layout.addWidget(url_label)
url_layout.addWidget(self.base_url_input)
# API key settings
api_layout = QHBoxLayout()
api_label = QLabel("API Key:")
self.api_key_input = QLineEdit(self.state["api_key"])
self.api_key_input.setEchoMode(QLineEdit.EchoMode.Password)
api_layout.addWidget(api_label)
api_layout.addWidget(self.api_key_input)
# Theme selection
theme_layout = QHBoxLayout()
theme_label = QLabel("Theme:")
self.theme_combo = QComboBox()
self.theme_combo.addItems(list(THEMES.keys()))
current_theme = self.state.get("theme", "Light")
self.theme_combo.setCurrentText(current_theme)
theme_layout.addWidget(theme_label)
theme_layout.addWidget(self.theme_combo)
# Stop hotkey setting
hotkey_layout = QHBoxLayout()
hotkey_label = QLabel("Stop Hotkey:")
self.hotkey_edit = HotkeyEdit(self.state.get("stop_hotkey", DEFAULT_STOP_HOTKEY))
hotkey_layout.addWidget(hotkey_label)
hotkey_layout.addWidget(self.hotkey_edit)
# Screen region selection
region_layout = QHBoxLayout()
self.select_region_btn = QPushButton("Select Screen Region")
self.region_info = QLabel("No region selected" if "screen_region" not in self.state else f"Selected region: {self.state['screen_region']}")
self.select_region_btn.clicked.connect(self.select_screen_region)
region_layout.addWidget(self.select_region_btn)
region_layout.addWidget(self.region_info)
# OK and Cancel buttons
button_layout = QHBoxLayout()
self.ok_button = QPushButton("OK")
self.cancel_button = QPushButton("Cancel")
self.ok_button.clicked.connect(self.accept)
self.cancel_button.clicked.connect(self.reject)
button_layout.addWidget(self.ok_button)
button_layout.addWidget(self.cancel_button)
# Add all elements to main layout
layout.addLayout(model_layout)
layout.addLayout(url_layout)
layout.addLayout(api_layout)
layout.addLayout(theme_layout)
layout.addLayout(hotkey_layout)
layout.addLayout(region_layout)
layout.addLayout(button_layout)
def select_screen_region(self):
"""Select screen region"""
# Minimize the parent window before selecting region
if self.parent_window:
self.parent_window.showMinimized()
# Wait a moment for the window to minimize
QTimer.singleShot(500, self._do_select_region)
else:
self._do_select_region()
def _do_select_region(self):
"""Actual region selection after minimizing"""
from util.screen_selector import ScreenSelector
region = ScreenSelector().get_selection()
# Restore the dialog and parent window
self.activateWindow()
if self.parent_window:
self.parent_window.showNormal()
self.parent_window.activateWindow()
if region:
self.state["screen_region"] = region
self.region_info.setText(f"Selected region: {region}")
else:
self.region_info.setText("Selection cancelled")
def get_settings(self):
"""Get settings content"""
return {
"model": self.model_input.text(),
"base_url": self.base_url_input.text(),
"api_key": self.api_key_input.text(),
"screen_region": self.state.get("screen_region", None),
"theme": self.theme_combo.currentText(),
"stop_hotkey": self.hotkey_edit.get_hotkey()
}

View File

@@ -1,59 +0,0 @@
"""
Settings manager for autoMate
Handles loading, saving, and updating application settings
"""
from xbrain.utils.config import Config
from ui.hotkey_edit import DEFAULT_STOP_HOTKEY
class SettingsManager:
"""Manages application settings"""
def __init__(self):
self.config = Config()
self.settings = self.load_initial_settings()
def load_initial_settings(self):
"""Load initial settings from config"""
return {
"api_key": self.config.OPENAI_API_KEY or "",
"base_url": self.config.OPENAI_BASE_URL or "https://api.openai.com/v1",
"model": self.config.OPENAI_MODEL or "gpt-4o",
"theme": "Light",
"stop_hotkey": DEFAULT_STOP_HOTKEY,
"only_n_most_recent_images": 2,
"screen_region": None
}
def get_settings(self):
"""Get current settings"""
return self.settings
def update_settings(self, new_settings):
"""Update settings"""
# Track if hotkey changed
hotkey_changed = False
if "stop_hotkey" in new_settings and new_settings["stop_hotkey"] != self.settings.get("stop_hotkey"):
hotkey_changed = True
# Track if theme changed
theme_changed = False
if "theme" in new_settings and new_settings["theme"] != self.settings.get("theme"):
theme_changed = True
# Update settings
self.settings.update(new_settings)
return {
"hotkey_changed": hotkey_changed,
"theme_changed": theme_changed
}
def save_to_config(self):
"""Save settings to config file"""
# Update config with current settings
self.config.OPENAI_API_KEY = self.settings.get("api_key", "")
self.config.OPENAI_BASE_URL = self.settings.get("base_url", "https://api.openai.com/v1")
self.config.OPENAI_MODEL = self.settings.get("model", "gpt-4o")
# Save config to file
self.config.save()

View File

@@ -1,30 +0,0 @@
"""
Task panel for autoMate
"""
from PyQt6.QtWidgets import QWidget, QVBoxLayout, QLabel, QTableWidget, QTableWidgetItem, QHeaderView
class TaskPanel(QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.setup_ui()
def setup_ui(self):
"""Initialize task panel UI"""
task_layout = QVBoxLayout(self)
task_label = QLabel("Task List")
self.task_table = QTableWidget(0, 2)
self.task_table.setHorizontalHeaderLabels(["Status", "Task"])
self.task_table.horizontalHeader().setSectionResizeMode(1, QHeaderView.ResizeMode.Stretch)
task_layout.addWidget(task_label)
task_layout.addWidget(self.task_table)
def update_tasks(self, tasks):
"""Update task table with new tasks"""
self.task_table.setRowCount(len(tasks))
for i, (status, task) in enumerate(tasks):
self.task_table.setItem(i, 0, QTableWidgetItem(status))
self.task_table.setItem(i, 1, QTableWidgetItem(task))
def clear(self):
"""Clear all tasks"""
self.task_table.setRowCount(0)

View File

@@ -1,99 +0,0 @@
"""
Theme definitions and theme handling functionality
"""
# Theme definitions
THEMES = {
"Light": {
"main_bg": "#F5F5F5",
"widget_bg": "#FFFFFF",
"text": "#333333",
"accent": "#4A86E8",
"button_bg": "#E3E3E3",
"button_text": "#333333",
"border": "#CCCCCC",
"selection_bg": "#D0E2F4"
},
"Dark": {
"main_bg": "#2D2D2D",
"widget_bg": "#3D3D3D",
"text": "#FFFFFF",
"accent": "#4A86E8",
"button_bg": "#555555",
"button_text": "#FFFFFF",
"border": "#555555",
"selection_bg": "#3A5F8A"
}
}
def apply_theme(widget, theme_name="Light"):
"""Apply the specified theme to the widget"""
theme = THEMES[theme_name]
# Create stylesheet for the application
stylesheet = f"""
QMainWindow, QDialog {{
background-color: {theme['main_bg']};
color: {theme['text']};
}}
QWidget {{
background-color: {theme['main_bg']};
color: {theme['text']};
}}
QLabel {{
color: {theme['text']};
}}
QPushButton {{
background-color: {theme['button_bg']};
color: {theme['button_text']};
border: 1px solid {theme['border']};
border-radius: 4px;
padding: 5px 10px;
}}
QPushButton:hover {{
background-color: {theme['accent']};
color: white;
}}
QLineEdit, QTextEdit, QTableWidget, QComboBox {{
background-color: {theme['widget_bg']};
color: {theme['text']};
border: 1px solid {theme['border']};
border-radius: 4px;
padding: 4px;
}}
QTextEdit {{
background-color: {theme['widget_bg']};
}}
QTableWidget::item:selected {{
background-color: {theme['selection_bg']};
}}
QHeaderView::section {{
background-color: {theme['button_bg']};
color: {theme['button_text']};
padding: 4px;
border: 1px solid {theme['border']};
}}
QSplitter::handle {{
background-color: {theme['border']};
}}
QScrollBar {{
background-color: {theme['widget_bg']};
}}
QScrollBar::handle {{
background-color: {theme['button_bg']};
border-radius: 4px;
}}
"""
widget.setStyleSheet(stylesheet)

View File

@@ -1,60 +0,0 @@
"""
System tray icon implementation
"""
from PyQt6.QtWidgets import QSystemTrayIcon, QMenu, QApplication
from PyQt6.QtGui import QAction
class StatusTrayIcon(QSystemTrayIcon):
"""System tray icon that displays application status"""
def __init__(self, icon, parent=None):
super().__init__(icon, parent)
self.parent = parent
self.setToolTip("autoMate")
# Create context menu
self.menu = QMenu()
self.show_action = QAction("Show Main Window")
self.show_action.triggered.connect(self.show_main_window)
self.menu_status = QAction("Status: Idle")
self.menu_status.setEnabled(False)
self.menu_task = QAction("Task: None")
self.menu_task.setEnabled(False)
self.exit_action = QAction("Exit")
self.exit_action.triggered.connect(QApplication.quit)
self.menu.addAction(self.show_action)
self.menu.addSeparator()
self.menu.addAction(self.menu_status)
self.menu.addAction(self.menu_task)
self.menu.addSeparator()
self.menu.addAction(self.exit_action)
self.setContextMenu(self.menu)
# Connect signals
self.activated.connect(self.icon_activated)
def show_main_window(self):
if self.parent:
self.parent.showNormal()
self.parent.activateWindow()
def icon_activated(self, reason):
if reason == QSystemTrayIcon.ActivationReason.DoubleClick:
self.show_main_window()
def update_status(self, status_text):
"""Update status text in tray tooltip and menu"""
# Truncate if too long for menu
short_status = status_text[:50] + "..." if len(status_text) > 50 else status_text
self.menu_status.setText(f"Status: {short_status}")
# Show brief notification but don't disrupt automation
# Only show notification for 500ms (very brief) to not interfere with visual automation
self.showMessage("autoMate Status", status_text, QSystemTrayIcon.MessageIcon.Information, 500)
def update_task(self, task_text):
"""Update task text in tray menu"""
short_task = task_text[:50] + "..." if len(task_text) > 50 else task_text
self.menu_task.setText(f"Task: {short_task}")

View File

@@ -1,174 +0,0 @@
import sys
import os
import time
# Add the project root directory to Python path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from pynput import mouse, keyboard
# Now you can import from auto_control
from auto_control.tools.screen_capture import get_screenshot
class ActionRecord:
"""Standardized data structure for all user actions"""
def __init__(self,
action_type: str,
position: tuple = (0, 0),
button: str = "",
key: str = "",
text: str = "",
base64_image = None):
self.data = {
"type": action_type, # 'click', 'key_press', 'text_input'
"timestamp": time.time(),
"position": position, # Mouse position or input position
"button": button, # Mouse button or keyboard key
"key": key, # Keyboard key
"text": text, # Input text content
"base64_image": base64_image # Screenshot image object
}
class AutoControl:
def __init__(self):
self.auto_list = []
self.text_buffer = [] # Buffer for collecting continuous text input
self.last_key_time = 0 # Timestamp of last keypress
self.input_timeout = 1.0 # Input timeout in seconds
def start_listen(self):
# Create both mouse and keyboard listeners
self.mouse_listener = mouse.Listener(
on_click=self.on_click,
on_scroll=self.on_scroll)
self.keyboard_listener = keyboard.Listener(
on_press=self.on_press,
on_release=self.on_release)
# Start both listeners
self.mouse_listener.start()
self.keyboard_listener.start()
def stop_listen(self):
self.mouse_listener.stop()
self.keyboard_listener.stop()
def on_click(self, x, y, button, pressed, injected):
if not pressed:
screenshot, _ = get_screenshot(is_base64=True)
record = ActionRecord(
action_type="click",
position=(x, y),
button=str(button),
base64_image=screenshot
)
self.auto_list.append(record.data)
def on_scroll(self, x, y, dx, dy, injected):
screenshot, _ = get_screenshot(is_base64=True)
record = ActionRecord(
action_type="scroll",
text=f"{'down' if dy < 0 else 'up'}",
base64_image=screenshot
)
self.auto_list.append(record.data)
def crop_image_if_position_in_coordinates(self, image, image_path, position, coordinates):
"""
Check if position is within coordinates and crop image if true
Args:
image: PIL Image object
position: tuple of (x, y) - current position
coordinates: tuple of (x1, y1, x2, y2) - target area
Returns:
bool: True if position is in coordinates
"""
x, y = position
x1, y1, x2, y2 = coordinates
# Check if position is within coordinates
if (x1 <= x <= x2) and (y1 <= y <= y2):
# Crop the image to the coordinates
cropped_image = image.crop(coordinates)
# Save the cropped image with proper path and format
save_path = str(image_path).replace('.png', '_cropped.png')
cropped_image.save(save_path, 'PNG')
return True
return False
def on_press(self, key, injected):
try:
current_time = time.time()
try:
char = key.char
except AttributeError:
if self.text_buffer and key in [keyboard.Key.space, keyboard.Key.enter]:
self._process_text_buffer()
# Record special key press
screenshot, _ = get_screenshot(is_base64=True)
record = ActionRecord(
action_type="key_press",
key=str(key),
base64_image=screenshot
)
self.auto_list.append(record.data)
return
if current_time - self.last_key_time > self.input_timeout and self.text_buffer:
self._process_text_buffer()
self.text_buffer.append(char)
self.last_key_time = current_time
except Exception as e:
print(f"Error in on_press: {e}")
def on_release(self, key, injected):
try:
# Process buffer immediately for these keys
if key in [keyboard.Key.enter, keyboard.Key.tab]:
if self.text_buffer:
self._process_text_buffer()
# Record special keys
if not hasattr(key, 'char'):
screenshot, _ = get_screenshot(is_base64=True)
record = ActionRecord(
action_type="special_key",
key=str(key),
base64_image=screenshot
)
self.auto_list.append(record.data)
except Exception as e:
print(f"Error in on_release: {e}")
def _process_text_buffer(self):
if not self.text_buffer:
return
text = ''.join(self.text_buffer)
screenshot, _ = get_screenshot(is_base64=True)
record = ActionRecord(
action_type="text_input",
text=text,
base64_image=screenshot
)
self.auto_list.append(record.data)
self.text_buffer = []
def stop_listen(self):
"""Stop listening and prepare data for LLM analysis"""
self.keyboard_listener.stop()
self.mouse_listener.stop()
if __name__ == "__main__":
auto_control = AutoControl()
auto_control.start_listen()

View File

@@ -1,34 +0,0 @@
import os
import platform
import pyautogui
from enum import Enum
import pyperclip
class AppName(Enum):
WECHAT = "wechat"
class AutoUtil:
def __init__(self, app_name: AppName):
self.img_dir = os.path.join(os.path.dirname(__file__),"..", "imgs", app_name.value)
def click_multi_img(self, img_names, offset_x=0, offset_y=0, minSearchTime=0):
for img_name in img_names:
self.find_click_img(img_name, offset_x, offset_y, minSearchTime)
def find_click_img(self, img_name, offset_x=0, offset_y=0, minSearchTime=0):
img_path = os.path.join(self.img_dir, img_name + ".png")
img = pyautogui.locateOnScreen(img_path, minSearchTime=minSearchTime)
x,y = pyautogui.center(img)
# Add offset to click position
pyautogui.click(x + offset_x, y + offset_y)
def send_text(self, text):
clipboard_data = pyperclip.paste()
pyperclip.copy(text)
if platform.system() == 'Darwin':
pyautogui.hotkey('command', 'v', interval=0.1)
else:
pyautogui.hotkey('ctrl', 'v')
# Copy old data back to clipboard
pyperclip.copy(clipboard_data)

View File

@@ -1,17 +0,0 @@
import os
from pathlib import Path
__WEIGHTS_DIR = Path("weights")
OMNI_PARSER_DIR = os.path.join(__WEIGHTS_DIR, "AI-ModelScope", "OmniParser-v2___0")
def download():
from modelscope import snapshot_download
# Create weights directory
__WEIGHTS_DIR.mkdir(exist_ok=True)
snapshot_download(
'AI-ModelScope/OmniParser-v2.0',
cache_dir='weights',
allow_file_pattern=['icon_detect/model.pt']
)
if __name__ == "__main__":
download()

View File

@@ -1,149 +0,0 @@
import tkinter as tk
from tkinter import Button
import sys
class ScreenSelector:
def __init__(self):
self.root = tk.Tk()
self.root.withdraw()
# 创建全屏窗口
self.window = tk.Toplevel(self.root)
self.window.attributes("-fullscreen", True)
self.window.attributes("-alpha", 0.6)
self.window.attributes("-topmost", True)
# 初始化变量
self.start_x = self.start_y = self.current_x = self.current_y = None
self.selection_rect = self.confirm_button = None
self.result = None
# 创建画布
self.canvas = tk.Canvas(self.window, bg="gray20", highlightthickness=0)
self.canvas.pack(fill=tk.BOTH, expand=True)
# 绑定事件
self.canvas.bind("<ButtonPress-1>", self.on_press)
self.canvas.bind("<B1-Motion>", self.on_drag)
self.canvas.bind("<ButtonRelease-1>", self.on_release)
self.window.bind("<Escape>", self.cancel)
def on_press(self, event):
# 清除已有选择
if self.selection_rect:
self.canvas.delete(self.selection_rect)
if self.confirm_button:
self.confirm_button.destroy()
self.confirm_button = None
self.start_x = self.canvas.canvasx(event.x)
self.start_y = self.canvas.canvasy(event.y)
self.selection_rect = self.canvas.create_rectangle(
self.start_x, self.start_y, self.start_x, self.start_y,
outline="red", width=5
)
def on_drag(self, event):
self.current_x = self.canvas.canvasx(event.x)
self.current_y = self.canvas.canvasy(event.y)
# 更新选择框
self.canvas.coords(self.selection_rect,
self.start_x, self.start_y,
self.current_x, self.current_y)
# 更新透明区域
self.update_region()
def update_region(self):
self.canvas.delete("transparent_region")
# 计算坐标
x1 = min(self.start_x, self.current_x)
y1 = min(self.start_y, self.current_y)
x2 = max(self.start_x, self.current_x)
y2 = max(self.start_y, self.current_y)
# 绘制背景和透明区域
self.canvas.create_rectangle(
0, 0, self.window.winfo_width(), self.window.winfo_height(),
fill="gray20", stipple="gray50", tags="transparent_region"
)
self.canvas.create_rectangle(
x1, y1, x2, y2, fill="", outline="", tags="transparent_region"
)
# 确保选择框在最上层
self.canvas.tag_raise(self.selection_rect)
def on_release(self, event):
self.current_x = self.canvas.canvasx(event.x)
self.current_y = self.canvas.canvasy(event.y)
# 有效选择判断
if abs(self.current_x - self.start_x) > 5 and abs(self.current_y - self.start_y) > 5:
self.show_button()
def show_button(self):
if self.confirm_button:
self.confirm_button.destroy()
# 计算坐标
x1 = min(self.start_x, self.current_x)
y1 = min(self.start_y, self.current_y)
x2 = max(self.start_x, self.current_x)
y2 = max(self.start_y, self.current_y)
# 计算距离四个角的距离
distances = [
((self.current_x - x1)**2 + (self.current_y - y1)**2, (x1 - 90, y1 - 40)), # 左上
((self.current_x - x2)**2 + (self.current_y - y1)**2, (x2 + 10, y1 - 40)), # 右上
((self.current_x - x1)**2 + (self.current_y - y2)**2, (x1 - 90, y2 + 10)), # 左下
((self.current_x - x2)**2 + (self.current_y - y2)**2, (x2 + 10, y2 + 10)) # 右下
]
# 选择最近的角
btn_x, btn_y = min(distances, key=lambda d: d[0])[1]
# 边界检查
width, height = self.window.winfo_width(), self.window.winfo_height()
if btn_x + 80 > width: btn_x = x1 - 90
if btn_x < 0: btn_x = x2 + 10
if btn_y < 0: btn_y = y2 + 10
if btn_y + 30 > height: btn_y = y1 - 40
# 创建按钮
self.confirm_button = Button(
self.window, text="Confirm", command=self.confirm,
bg="white", fg="black", font=("Arial", 12, "bold"),
padx=10, pady=5
)
self.confirm_button.place(x=btn_x, y=btn_y)
def confirm(self):
# 获取选择区域坐标
x1 = min(self.start_x, self.current_x)
y1 = min(self.start_y, self.current_y)
x2 = max(self.start_x, self.current_x)
y2 = max(self.start_y, self.current_y)
self.result = (int(x1), int(y1), int(x2), int(y2))
self.root.quit()
self.window.destroy()
def cancel(self, event=None):
self.result = None
self.root.quit()
self.window.destroy()
def get_selection(self):
self.root.mainloop()
if hasattr(self, 'root') and self.root:
self.root.destroy()
return self.result
if __name__ == "__main__":
region = ScreenSelector().get_selection()
print(f"Selected region: {region}")
sys.exit(0)

View File

@@ -1,20 +0,0 @@
import os
import pyautogui
from PIL import Image
from io import BytesIO
def capture_screen_with_cursor():
"""Local function to capture the screen with cursor."""
cursor_path = os.path.join(os.path.dirname(__file__),"..","imgs", "cursor.png")
screenshot = pyautogui.screenshot()
cursor_x, cursor_y = pyautogui.position()
cursor = Image.open(cursor_path)
cursor = cursor.resize((int(cursor.width / 1.5), int(cursor.height / 1.5)))
screenshot.paste(cursor, (cursor_x, cursor_y), cursor)
img_io = BytesIO()
screenshot.save(img_io, 'PNG')
img_io.seek(0)
return img_io

View File

@@ -1,30 +0,0 @@
import os
import sys
import time
import pyautogui
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from util.auto_util import AppName, AutoUtil
class WechatAuto:
def __init__(self):
self.auto_util = AutoUtil(AppName.WECHAT)
def go_to_chat(self):
self.auto_util.find_click_img("chat_unselect.png")
def search_friend(self, friend_name):
try:
self.auto_util.find_click_img("chat_unselect")
except pyautogui.ImageNotFoundException:
self.auto_util.find_click_img("chat_select")
self.auto_util.find_click_img("search", offset_x=100)
self.auto_util.send_text(friend_name)
self.auto_util.find_click_img("contact_person",offset_x=100,offset_y=100,minSearchTime=10)
self.auto_util.find_click_img("search",offset_x=-100,offset_y=-100,minSearchTime=10)
if __name__ == "__main__":
time.sleep(3)
wechat_auto = WechatAuto()
wechat_auto.search_friend("李杨林")