add few shot generate

This commit is contained in:
yuruo
2025-03-25 23:27:22 +08:00
parent ba483e1cb1
commit d50f3d42d2
5 changed files with 223 additions and 128 deletions

View File

@@ -0,0 +1,37 @@
from argparse import Action
import json
from auto_control.agent.base_agent import BaseAgent
from xbrain.core.chat import run
class FewShotGenerateAgent(BaseAgent):
def __call__(self, action_list):
# Create content list with text-image pairs for each action
content_list = []
for idx, action in enumerate(action_list, 1):
# Create a copy of action without screen_result
action_without_screen = action.copy()
action_without_screen.pop('base64_image', None)
content_list.extend([
{"type": "text", "text": f"Step {idx}:\n{json.dumps(action_without_screen, indent=2)}"},
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{action['base64_image']}"}
}
])
messages = [{"role": "user", "content": content_list}]
user_prompt = prompt.format(actions=json.dumps(action_list, indent=2))
response = run(
messages,
user_prompt=user_prompt)
return response
prompt = """Please analyze this sequence of user input actions and create few-shot learning examples.
The recorded actions include mouse clicks, keyboard inputs, and special key presses, along with their timing and UI context.
Please create structured examples that show:
1. The user's intent and context
2. The sequence of actions needed
3. Important UI elements involved
4. Any timing or order dependencies
Format each example to demonstrate the complete interaction pattern."""

View File

@@ -1,3 +1,4 @@
import base64
from io import BytesIO
from pathlib import Path
from uuid import uuid4
@@ -8,7 +9,7 @@ from util import tool
OUTPUT_DIR = "./tmp/outputs"
def get_screenshot(screen_region=None, is_cursor=True):
def get_screenshot(screen_region=None, is_cursor=True, is_base64=False):
output_dir = Path(OUTPUT_DIR)
output_dir.mkdir(parents=True, exist_ok=True)
path = output_dir / f"screenshot_{uuid4().hex}.png"
@@ -31,7 +32,10 @@ def get_screenshot(screen_region=None, is_cursor=True):
black_mask.paste(region, (x1, y1, x2, y2))
# Use the modified image as screenshot
screenshot = black_mask
screenshot.save(path)
if is_base64:
screenshot.save(path)
with open(path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8'), path
return screenshot, path
except Exception as e:
raise ToolError(f"Failed to capture screenshot: {str(e)}")

View File

@@ -114,7 +114,6 @@ class MainWindow(QMainWindow):
self.hotkey_handler = None
except:
pass
try:
keyboard.unhook_all_hotkeys()
except:
@@ -146,8 +145,25 @@ class MainWindow(QMainWindow):
def _stop_process_main_thread(self):
"""在主线程中安全地执行停止处理"""
self.state["stop"] = True
# 停止 worker
if hasattr(self, 'worker') and self.worker is not None:
self.worker.terminate()
# 停止录制/监听线程
if hasattr(self, 'recording_manager') and hasattr(self.recording_manager, 'listen_thread'):
if self.recording_manager.listen_thread is not None and self.recording_manager.listen_thread.isRunning():
# 停止监听线程
self.recording_manager.listen_thread.requestInterruption()
self.recording_manager.listen_thread.wait(1000) # 等待最多1秒
if self.recording_manager.listen_thread.isRunning():
self.recording_manager.listen_thread.terminate() # 强制终止
# 清理相关状态
self.recording_manager.listen_thread = None
self.chat_panel.append_message("📝 录制已停止", "blue")
# 其他现有的停止处理代码...
if self.isMinimized():
self.showNormal()
self.activateWindow()

View File

@@ -2,10 +2,38 @@
Recording manager for autoMate
Handles recording and demonstration functionality
"""
import util.auto_control as auto_control
from util.auto_control import AutoControl
from ui.recording_panel import RecordingIndicator
from ui.demonstration_panel import DemonstrationPanel
from PyQt6.QtCore import QThread, pyqtSignal
import time
class ActionListenThread(QThread):
finished_signal = pyqtSignal()
def __init__(self, action_listen):
super().__init__()
self.action_listen = action_listen
def run(self):
try:
# start listen
self.action_listen.start_listen()
# wait for interruption request
while not self.isInterruptionRequested():
time.sleep(0.1)
except Exception as e:
print(f"Action listening error: {e}")
finally:
# stop listen and clean up resources
try:
self.action_listen.stop_listen()
self.finished_signal.emit()
except Exception as e:
print(f"Cleanup error: {e}")
class RecordingManager:
def __init__(self, parent=None):
self.parent = parent
@@ -13,69 +41,44 @@ class RecordingManager:
self.recording_indicator = None
self.demo_panel = None
self.demonstration_mode = False
def start_recording(self):
"""Start recording user actions"""
if not self.recording_in_progress:
self.recording_in_progress = True
# 最小化主窗口
if self.parent:
self.parent.showMinimized()
# 显示录制指示器
self.recording_indicator = RecordingIndicator(stop_callback=self.stop_recording)
self.recording_indicator.show()
# 开始监听用户动作
auto_control.start_monitoring()
def stop_recording(self):
"""Stop recording user actions"""
if self.recording_in_progress:
self.recording_in_progress = False
# 停止监听用户动作
auto_control.stop_monitoring()
# 关闭录制指示器
if self.recording_indicator:
self.recording_indicator.close()
self.recording_indicator = None
# 恢复主窗口
if self.parent:
self.parent.showNormal()
self.action_listen = AutoControl()
def start_demonstration(self):
"""Start demonstration mode for system learning"""
# Set demonstration mode flag
self.demonstration_mode = True
# 隐藏主窗口
# hide main window
if self.parent:
self.parent.showMinimized()
# 创建并显示独立的演示控制面板
# create and show independent demonstration control panel
self.demo_panel = DemonstrationPanel(stop_callback=self.stop_demonstration)
self.demo_panel.show()
# 开始监听用户动作
auto_control.start_monitoring()
# create and start listen thread
self.listen_thread = ActionListenThread(self.action_listen)
self.listen_thread.finished_signal.connect(self.process_recorded_actions)
self.listen_thread.start()
def stop_demonstration(self):
"""Stop demonstration mode and process the recorded actions"""
# 停止监听用户动作
auto_control.stop_monitoring()
# 关闭独立的演示控制面板
# stop listening to user actions
self.listen_thread.requestInterruption()
# close independent demonstration control panel
if self.demo_panel:
self.demo_panel.close()
self.demo_panel = None
# 恢复主窗口
# restore main window
if self.parent:
self.parent.showNormal()
# Reset state
self.demonstration_mode = False
self.demonstration_mode = False
def process_recorded_actions(self):
"""process all recorded actions"""
# get all collected actions
recorded_actions = self.action_listen.auto_list
print("recorded_actions: ", recorded_actions)

View File

@@ -1,98 +1,81 @@
import sys
import os
import time
import json
from auto_control.agent.few_shot_generate_agent import FewShotGenerateAgent
# Add the project root directory to Python path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from auto_control.agent.vision_agent import VisionAgent
from util.download_weights import OMNI_PARSER_DIR
from pynput import mouse, keyboard
# Now you can import from auto_control
from auto_control.tools.screen_capture import get_screenshot
class ActionRecord:
"""Standardized data structure for all user actions"""
def __init__(self,
action_type: str,
position: tuple = (0, 0),
button: str = "",
key: str = "",
text: str = "",
base64_image = None):
self.data = {
"type": action_type, # 'click', 'key_press', 'text_input'
"timestamp": time.time(),
"position": position, # Mouse position or input position
"button": button, # Mouse button or keyboard key
"key": key, # Keyboard key
"text": text, # Input text content
"base64_image": base64_image # Screenshot image object
}
class AutoControl:
def __init__(self):
self.auto_list = []
self.tmp_auto_list = []
self.text_buffer = [] # Buffer for collecting continuous text input
self.last_key_time = 0 # Timestamp of last keypress
self.input_timeout = 1.0 # Input timeout in seconds
def start_listen(self):
# Create both mouse and keyboard listeners
mouse_listener = mouse.Listener(
on_move=self.on_move,
self.mouse_listener = mouse.Listener(
on_click=self.on_click,
on_scroll=self.on_scroll)
keyboard_listener = keyboard.Listener(
self.keyboard_listener = keyboard.Listener(
on_press=self.on_press,
on_release=self.on_release)
# Start both listeners
mouse_listener.start()
keyboard_listener.start()
# Keep the program running until keyboard listener stops
keyboard_listener.join()
# After keyboard stops (ESC pressed), stop mouse listener too
mouse_listener.stop()
self.mouse_listener.start()
self.keyboard_listener.start()
def on_move(self, x, y, injected):
print('Pointer moved to {}; it was {}'.format(
(x, y), 'faked' if injected else 'not faked'))
def stop_listen(self):
self.mouse_listener.stop()
self.keyboard_listener.stop()
def on_click(self, x, y, button, pressed, injected):
print('Mouse {} {} at {}; it was {}'.format(
button,
'Pressed' if pressed else 'Released',
(x, y),
'faked' if injected else 'not faked'))
if not pressed:
# wait right click window
if button == mouse.Button.right:
time.sleep(1)
screenshot, path = get_screenshot(is_cursor=False)
self.auto_list.append(
{"button": button,
"pressed": pressed,
"position": (x, y),
"path": path,
"image": screenshot
}
screenshot, _ = get_screenshot(is_base64=True)
record = ActionRecord(
action_type="click",
position=(x, y),
button=str(button),
base64_image=screenshot
)
self.auto_list.append(record.data)
def on_scroll(self, x, y, dx, dy, injected):
print('Scrolled {} at {}; it was {}'.format(
'down' if dy < 0 else 'up',
(x, y), 'faked' if injected else 'not faked'))
screenshot, _ = get_screenshot(is_base64=True)
record = ActionRecord(
action_type="scroll",
text=f"{'down' if dy < 0 else 'up'}",
base64_image=screenshot
)
self.auto_list.append(record.data)
def on_press(self, key, injected):
try:
print('alphanumeric key {} pressed; it was {}'.format(
key.char, 'faked' if injected else 'not faked'))
except AttributeError:
print('special key {} pressed'.format(
key))
def on_release(self, key, injected):
print('{} released; it was {}'.format(
key, 'faked' if injected else 'not faked'))
if key == keyboard.Key.esc:
print("self.auto_list", self.auto_list)
vision_agent = VisionAgent(yolo_model_path=os.path.join(OMNI_PARSER_DIR, "icon_detect", "model.pt"))
for item in self.auto_list:
element_list =vision_agent(str(item["path"]))
for element in element_list:
if self.crop_image_if_position_in_coordinates(item["image"], item["path"], item["position"], element.coordinates):
break
# Stop listener
return False
def crop_image_if_position_in_coordinates(self, image, image_path, position, coordinates):
"""
Check if position is within coordinates and crop image if true
@@ -119,24 +102,76 @@ class AutoControl:
return False
# User action monitoring module
def on_press(self, key, injected):
try:
current_time = time.time()
try:
char = key.char
except AttributeError:
if self.text_buffer and key in [keyboard.Key.space, keyboard.Key.enter]:
self._process_text_buffer()
# Record special key press
screenshot, _ = get_screenshot(is_base64=True)
record = ActionRecord(
action_type="key_press",
key=str(key),
base64_image=screenshot
)
self.auto_list.append(record.data)
return
if current_time - self.last_key_time > self.input_timeout and self.text_buffer:
self._process_text_buffer()
self.text_buffer.append(char)
self.last_key_time = current_time
except Exception as e:
print(f"Error in on_press: {e}")
def on_release(self, key, injected):
try:
# Process buffer immediately for these keys
if key in [keyboard.Key.enter, keyboard.Key.tab]:
if self.text_buffer:
self._process_text_buffer()
# Record special keys
if not hasattr(key, 'char'):
screenshot, _ = get_screenshot(is_base64=True)
record = ActionRecord(
action_type="special_key",
key=str(key),
base64_image=screenshot
)
self.auto_list.append(record.data)
except Exception as e:
print(f"Error in on_release: {e}")
def _process_text_buffer(self):
if not self.text_buffer:
return
text = ''.join(self.text_buffer)
screenshot, _ = get_screenshot(is_base64=True)
record = ActionRecord(
action_type="text_input",
text=text,
base64_image=screenshot
)
self.auto_list.append(record.data)
self.text_buffer = []
def start_monitoring():
"""
Start monitoring user actions (keyboard and mouse)
"""
print("Started monitoring user actions")
# Implementation for monitoring user actions
# This could use libraries like pynput, pyautogui, etc.
def stop_monitoring():
"""
Stop monitoring user actions
"""
print("Stopped monitoring user actions")
# Implementation to stop monitoring
# Additional functionality for processing recorded actions
def stop_listen(self):
"""Stop listening and prepare data for LLM analysis"""
self.keyboard_listener.stop()
self.mouse_listener.stop()
few_shot_generate_agent = FewShotGenerateAgent()
return few_shot_generate_agent(self.auto_list)
if __name__ == "__main__":
auto_control = AutoControl()