支持自动化操作

This commit is contained in:
yuruo 2025-03-04 10:59:31 +08:00
parent 20bd0dd870
commit 5e9da64ecd
6 changed files with 66 additions and 129 deletions

3
.gitignore vendored
View File

@ -1,4 +1,5 @@
**/__pycache__**
weights**
.conda**
.venv
.venv
tmp**

81
auto.py
View File

@ -1,81 +0,0 @@
import os
import logging
import argparse
import shlex
import subprocess
from flask import Flask, request, jsonify, send_file
import threading
import traceback
import pyautogui
from PIL import Image
from io import BytesIO
parser = argparse.ArgumentParser()
parser.add_argument("--log_file", help="log file path", type=str,
default=os.path.join(os.path.dirname(__file__), "server.log"))
parser.add_argument("--port", help="port", type=int, default=5000)
args = parser.parse_args()
logging.basicConfig(filename=args.log_file,level=logging.DEBUG, filemode='w' )
logger = logging.getLogger('werkzeug')
app = Flask(__name__)
computer_control_lock = threading.Lock()
@app.route('/probe', methods=['GET'])
def probe_endpoint():
return jsonify({"status": "Probe successful", "message": "Service is operational"}), 200
@app.route('/execute', methods=['POST'])
def execute_command():
# Only execute one command at a time
with computer_control_lock:
data = request.json
# The 'command' key in the JSON request should contain the command to be executed.
shell = data.get('shell', False)
command = data.get('command', "" if shell else [])
if isinstance(command, str) and not shell:
command = shlex.split(command)
# Expand user directory
for i, arg in enumerate(command):
if arg.startswith("~/"):
command[i] = os.path.expanduser(arg)
# Execute the command without any safety checks.
try:
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell, text=True, timeout=120)
return jsonify({
'status': 'success',
'output': result.stdout,
'error': result.stderr,
'returncode': result.returncode
})
except Exception as e:
logger.error("\n" + traceback.format_exc() + "\n")
return jsonify({
'status': 'error',
'message': str(e)
}), 500
@app.route('/screenshot', methods=['GET'])
def capture_screen_with_cursor():
cursor_path = os.path.join(os.path.dirname(__file__), "cursor.png")
screenshot = pyautogui.screenshot()
cursor_x, cursor_y = pyautogui.position()
cursor = Image.open(cursor_path)
# make the cursor smaller
cursor = cursor.resize((int(cursor.width / 1.5), int(cursor.height / 1.5)))
screenshot.paste(cursor, (cursor_x, cursor_y), cursor)
# Convert PIL Image to bytes and send
img_io = BytesIO()
screenshot.save(img_io, 'PNG')
img_io.seek(0)
return send_file(img_io, mimetype='image/png')
if __name__ == '__main__':
app.run(debug=True, host="0.0.0.0", port=args.port)

View File

@ -4,7 +4,7 @@ from enum import StrEnum
from typing import Literal, TypedDict
from PIL import Image
from util import tool
from anthropic.types.beta import BetaToolComputerUse20241022Param
from .base import BaseAnthropicTool, ToolError, ToolResult
@ -236,18 +236,12 @@ class ComputerTool(BaseAnthropicTool):
try:
print(f"sending to vm: {command_list}")
response = requests.post(
f"http://localhost:5000/execute",
headers={'Content-Type': 'application/json'},
json={"command": command_list},
timeout=90
)
# 使用 tool.execute_command 替代 requests.post
response = tool.execute_command(command_list)
time.sleep(0.7) # avoid async error as actions take time to complete
print(f"action executed")
if response.status_code != 200:
raise ToolError(f"Failed to execute command. Status code: {response.status_code}")
if parse:
output = response.json()['output'].strip()
output = response['output'].strip()
match = re.search(r'Point\(x=(\d+),\s*y=(\d+)\)', output)
if not match:
raise ToolError(f"Could not parse coordinates from output: {output}")
@ -255,26 +249,6 @@ class ComputerTool(BaseAnthropicTool):
return x, y
except requests.exceptions.RequestException as e:
raise ToolError(f"An error occurred while trying to execute the command: {str(e)}")
async def screenshot(self):
if not hasattr(self, 'target_dimension'):
screenshot = self.padding_image(screenshot)
self.target_dimension = MAX_SCALING_TARGETS["WXGA"]
width, height = self.target_dimension["width"], self.target_dimension["height"]
screenshot, path = get_screenshot(resize=True, target_width=width, target_height=height)
time.sleep(0.7) # avoid async error as actions take time to complete
return ToolResult(base64_image=base64.b64encode(path.read_bytes()).decode())
def padding_image(self, screenshot):
"""Pad the screenshot to 16:10 aspect ratio, when the aspect ratio is not 16:10."""
_, height = screenshot.size
new_width = height * 16 // 10
padding_image = Image.new("RGB", (new_width, height), (255, 255, 255))
# padding to top left
padding_image.paste(screenshot, (0, 0))
return padding_image
def scale_coordinates(self, source: ScalingSource, x: int, y: int):
"""Scale coordinates to a target maximum resolution."""
if not self._scaling_enabled:
@ -306,20 +280,15 @@ class ComputerTool(BaseAnthropicTool):
return round(x / x_scaling_factor), round(y / y_scaling_factor)
# scale down
return round(x * x_scaling_factor), round(y * y_scaling_factor)
def get_screen_size(self):
"""Return width and height of the screen"""
try:
response = requests.post(
f"http://localhost:5000/execute",
headers={'Content-Type': 'application/json'},
json={"command": ["python", "-c", "import pyautogui; print(pyautogui.size())"]},
timeout=90
# 使用 tool.execute_command 替代 requests.post
response = tool.execute_command(
["python", "-c", "import pyautogui; print(pyautogui.size())"]
)
if response.status_code != 200:
raise ToolError(f"Failed to get screen size. Status code: {response.status_code}")
output = response.json()['output'].strip()
output = response['output'].strip()
match = re.search(r'Size\(width=(\d+),\s*height=(\d+)\)', output)
if not match:
raise ToolError(f"Could not parse screen size from output: {output}")

View File

@ -1,9 +1,9 @@
from pathlib import Path
from uuid import uuid4
import requests
from PIL import Image
from .base import BaseAnthropicTool, ToolError
from io import BytesIO
from util import tool
OUTPUT_DIR = "./tmp/outputs"
@ -14,12 +14,9 @@ def get_screenshot(resize: bool = False, target_width: int = 1920, target_height
path = output_dir / f"screenshot_{uuid4().hex}.png"
try:
response = requests.get('http://localhost:5000/screenshot')
if response.status_code != 200:
raise ToolError(f"Failed to capture screenshot: HTTP {response.status_code}")
# (1280, 800)
screenshot = Image.open(BytesIO(response.content))
# 使用 tool.capture_screen_with_cursor 替代 requests.get
img_io = tool.capture_screen_with_cursor()
screenshot = Image.open(img_io)
if resize and screenshot.size != (target_width, target_height):
screenshot = screenshot.resize((target_width, target_height))

BIN
resources/cursor.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.1 KiB

51
util/tool.py Normal file
View File

@ -0,0 +1,51 @@
import os
import logging
import argparse
import shlex
import subprocess
import threading
import traceback
import pyautogui
from PIL import Image
from io import BytesIO
computer_control_lock = threading.Lock()
def execute_command(command, shell=False):
"""Local function to execute a command."""
with computer_control_lock:
if isinstance(command, str) and not shell:
command = shlex.split(command)
# Expand user directory
for i, arg in enumerate(command):
if arg.startswith("~/"):
command[i] = os.path.expanduser(arg)
try:
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=shell, text=True, timeout=120)
return {
'status': 'success',
'output': result.stdout,
'error': result.stderr,
'returncode': result.returncode
}
except Exception as e:
logger.error("\n" + traceback.format_exc() + "\n")
return {
'status': 'error',
'message': str(e)
}
def capture_screen_with_cursor():
"""Local function to capture the screen with cursor."""
cursor_path = os.path.join(os.path.dirname(__file__),"..","resources", "cursor.png")
screenshot = pyautogui.screenshot()
cursor_x, cursor_y = pyautogui.position()
cursor = Image.open(cursor_path)
cursor = cursor.resize((int(cursor.width / 1.5), int(cursor.height / 1.5)))
screenshot.paste(cursor, (cursor_x, cursor_y), cursor)
img_io = BytesIO()
screenshot.save(img_io, 'PNG')
img_io.seek(0)
return img_io