update few_shot

This commit is contained in:
yuruo 2025-03-29 11:08:09 +08:00
parent fd0f6fc722
commit cabb3fa7c7
13 changed files with 1722 additions and 134 deletions

View File

@ -1,7 +1,15 @@
pyxbrain==1.1.31
pynput
pyqt6
pyautogui==0.9.54
pillow==11.1.0
pyautogui
pillow
keyboard
mouse
speechrecognition
pyaudio
numpy
soundfile
pydub
webrtcvad-wheels
baidu-aip
chardet

View File

@ -3,15 +3,57 @@ Conversation manager module for handling dialog flow and states
"""
import json
import time
from PyQt6.QtCore import QObject, QThread, QTimer
from PyQt6.QtCore import QObject, QThread, QTimer, pyqtSignal, pyqtSlot
from src.core.few_shot_agent import FewShotGenerateAgent
from src.core.input_listener import InputListener
from src.utils.audio_recorder import AudioRecorder
from xbrain.core.chat import run
import multiprocessing
from multiprocessing import Process, Queue, Manager
class AnalysisWorker(QObject):
finished = pyqtSignal(str)
error = pyqtSignal(str)
# 将信号改回只接受一个参数
progress_update = pyqtSignal(str)
def __init__(self, agent, task_demonstration, user_instruction):
super().__init__()
self.agent = agent
self.task_demonstration = task_demonstration
self.user_instruction = user_instruction
@pyqtSlot()
def process(self):
try:
# 执行分析,获取生成器
result_generator = self.agent(self.task_demonstration, self.user_instruction)
# 迭代生成器,处理每个产生的值
for content in result_generator:
# 发送每个生成的内容更新
self.progress_update.emit(content)
self.finished.emit("分析完成")
except Exception as e:
self.error.emit(str(e))
# 处理流式输出的进度更新
def handle_analysis_progress(self, segment_text, is_thinking):
"""处理分析过程中的流式输出"""
if self.current_ai_message_id is not None:
# 根据is_thinking参数应用不同样式
if is_thinking:
# 思考过程使用浅色样式
styled_text = f"<span style='color: #888888; font-style: italic;'>{segment_text}</span>"
self.chat_area.update_message(self.current_ai_message_id, styled_text, preserve_html=True)
else:
# 结果使用正常样式
self.chat_area.update_message(self.current_ai_message_id, segment_text)
class ConversationManager(QObject):
"""
Manages conversation state and process user interactions
@ -45,6 +87,11 @@ class ConversationManager(QObject):
self.analysis_results = self.manager.dict()
self.pool = multiprocessing.Pool(processes=1)
self.user_instruction = ""
# 新增:语音录制和意图检测
self.is_voice_recording = False
self.utterances = []
# Start the conversation
self.start_conversation()
@ -86,26 +133,52 @@ class ConversationManager(QObject):
"""Handle user's response to the demo request"""
if any(keyword in message.lower() for keyword in ["can", "yes", "now", "start", "demo"]):
response = "Great! I'll minimize the window but keep a small control in the corner. " + \
"Click 'Finish Demo' when you're done, and I'll record your steps."
"Click 'Finish Demo' when you're done, and I'll record your steps. " + \
"I'll also record your voice to understand your intentions while performing actions."
self.chat_area.add_message("Xiao Hong", response)
self.conversation_state = "task_demonstration"
self.is_recording = True
# 清空之前的记录
self.task_demonstration = []
self.utterances = []
# Delay 1 second before starting recording mode
QTimer.singleShot(1000, self.start_recording_mode)
else:
response = "No problem, just let me know whenever you're ready to demonstrate. I'll be here."
self.chat_area.add_message("Xiao Hong", response)
def handle_utterance(self, utterance_data):
"""
处理检测到的语音意图
Args:
utterance_data: 包含语音识别结果的字典
"""
# 添加到utterances列表
self.utterances.append(utterance_data)
# 添加步骤计数
utterance_data['step_number'] = self.step_counter
# 更新状态显示
status_text = f"语音命令: \"{utterance_data['text']}\""
self.update_mini_window_status(status_text)
# 这里我们不把utterance添加到task_demonstration列表
# 因为我们需要将其作为分割点,而不是直接作为动作
def analyze_action(self, action):
"""Analyze user actions during demonstration"""
self.step_counter += 1
# 准备简化的动作数据
action_data = {
'type': action['type'],
'type': 'action', # 新增明确标记为action类型
'event': str(action['event']),
'step_number': self.step_counter,
'timestamp': time.time(), # 新增:添加时间戳
'base64_image': action['base64_image']
}
@ -157,8 +230,6 @@ class ConversationManager(QObject):
# 更新状态显示
self.update_mini_window_status(status_text)
def update_mini_window_status(self, text):
"""
Update the status text in the mini window
@ -177,27 +248,65 @@ class ConversationManager(QObject):
# Show mini window
self.mini_window.show()
self.chat_area.add_message("System", "Recording your demonstration...")
self.chat_area.add_message("System", "Recording your demonstration and voice...")
# Create input listener
self.keyboard_mouse_listen = InputListener()
self.keyboard_mouse_listen.action_detected.connect(self.analyze_action)
# Set up thread
# Set up thread for input listening
self.listen_thread = QThread()
self.keyboard_mouse_listen.terminated.connect(self.listen_thread.quit)
self.keyboard_mouse_listen.moveToThread(self.listen_thread)
self.listen_thread.started.connect(self.keyboard_mouse_listen.start_listen)
# Start thread
# 新增:创建语音录制器
self.audio_recorder = AudioRecorder()
self.audio_recorder.utterance_detected.connect(self.handle_utterance)
self.audio_recorder.recording_status.connect(self.update_audio_status)
# 设置语音录制线程
self.audio_thread = QThread()
self.audio_recorder.terminated.connect(self.audio_thread.quit)
self.audio_recorder.moveToThread(self.audio_thread)
self.audio_thread.started.connect(self.audio_recorder.start_recording)
# Start threads
self.listen_thread.start()
self.audio_thread.start()
self.is_voice_recording = True
def update_audio_status(self, status):
"""
更新语音状态信息
Args:
status: 语音状态文本
"""
# 在mini window使用专用语音状态标签
if hasattr(self.mini_window, 'set_voice_status'):
self.mini_window.set_voice_status(status)
# 同时也更新主状态区域的显示
current_text = self.mini_window.status_label.text() if hasattr(self.mini_window, 'status_label') else ""
if "语音" not in current_text:
self.update_mini_window_status(f"{current_text}\n语音: {status}")
else:
# 替换语音状态部分
lines = current_text.split("\n")
updated_lines = [line if "语音" not in line else f"语音: {status}" for line in lines]
self.update_mini_window_status("\n".join(updated_lines))
def finish_demonstration(self):
"""Complete the demonstration recording process"""
# 关闭进程池并等待所有任务完成
# Clean up
# Clean up keyboard/mouse listener
self.keyboard_mouse_listen.stop_listen()
# 新增:停止语音录制
if self.is_voice_recording:
self.audio_recorder.stop_recording()
self.is_voice_recording = False
# Restore main window
if hasattr(self, 'parent'):
self.parent().showNormal()
@ -206,35 +315,79 @@ class ConversationManager(QObject):
self.mini_window.hide()
self.is_recording = False
# 合并utterances和actions
self.prepare_mixed_sequence()
# 保存演示数据
self.save_task_demonstration()
# 显示学习中的消息
self.chat_area.add_message("System", "Learning in progress, please wait...")
# Create process pool for few shot agent
self.pool = multiprocessing.Pool(processes=1)
# Call few shot agent asynchronously
agent = FewShotGenerateAgent()
# Get user instruction from main window
result = self.pool.apply_async(agent, args=(self.task_demonstration, self.user_instruction))
# 创建分析线程而不是进程池
self.analysis_thread = QThread()
self.agent = FewShotGenerateAgent()
try:
# Get result with timeout
response = result.get(timeout=999)
# Display response from agent
self.chat_area.add_message("Xiao Hong", "I've analyzed your demonstration. Here's what I learned:\n" + response)
# 使用合并后的混合序列而不是仅使用action序列
self.worker = AnalysisWorker(self.agent, self.task_demonstration, self.user_instruction)
# 连接信号到槽函数
self.worker.finished.connect(self.handle_analysis_result)
self.worker.error.connect(self.handle_analysis_error)
self.worker.progress_update.connect(self.handle_progress_update)
# 迁移worker到线程
self.worker.moveToThread(self.analysis_thread)
self.analysis_thread.started.connect(self.worker.process)
self.worker.finished.connect(self.analysis_thread.quit)
self.worker.error.connect(self.analysis_thread.quit)
# 启动线程
self.analysis_thread.start()
# 添加一个进度提示
self.progress_timer = QTimer(self)
self.progress_timer.timeout.connect(self.update_analysis_progress)
self.progress_counter = 0
self.progress_timer.start(1000)
def prepare_mixed_sequence(self):
"""
准备混合序列将utterances和actions合并成按时间排序的混合序列
"""
# 将utterances添加到task_demonstration中
mixed_sequence = self.task_demonstration.copy()
for utterance in self.utterances:
# 确保每个utterance都有timestamp
if 'timestamp' not in utterance:
utterance['timestamp'] = time.time() # 如果没有时间戳,使用当前时间
except TimeoutError:
self.chat_area.add_message("System", "Analysis timed out. Please try again.")
except Exception as e:
self.chat_area.add_message("System", f"Error during analysis: {str(e)}")
finally:
# Clean up pool
self.pool.close()
self.pool.join()
mixed_sequence.append(utterance)
# 按时间戳排序
self.task_demonstration = sorted(mixed_sequence, key=lambda x: x.get('timestamp', 0))
def update_analysis_progress(self):
"""更新分析进度"""
self.progress_counter += 1
if self.progress_counter % 15 == 0: # 每15秒更新一次消息
self.chat_area.add_message("System", f"Analysis in progress... ({self.progress_counter} seconds)")
def handle_analysis_result(self, result):
"""处理分析结果"""
self.progress_timer.stop()
if result != "分析完成":
self.chat_area.add_message("Xiao Hong", result)
self.conversation_state = "ready"
def handle_analysis_error(self, error_msg):
"""处理分析错误"""
self.progress_timer.stop()
self.chat_area.add_message("System", f"Error during analysis: {error_msg}")
print(f"Error during analysis: {error_msg}")
self.conversation_state = "ready"
def handle_ready_state(self, message):
"""
@ -255,6 +408,10 @@ class ConversationManager(QObject):
except Exception as e:
self.chat_area.add_message("System", f"Error saving task demonstration: {str(e)}")
def handle_progress_update(self, content):
"""处理分析过程中的进度更新"""
self.chat_area.add_message("Xiao Hong", content)
def __del__(self):
"""析构函数,确保进程池正确关闭"""
if hasattr(self, 'pool'):

View File

@ -1,72 +1,171 @@
import copy
from xbrain.core.chat import run
class FewShotGenerateAgent:
def __call__(self, action_list, user_instruction):
# Create content list with text-image pairs for each action
# Create action message without base64 image
action_list_copy = action_list.copy()
action_list_copy = [i.pop('base64_image') for i in action_list_copy]
messages = [{"role": "user", "content":
[{"type": "text", "text": "用户的指令是" + user_instruction + "\n\n 用户的动作序列是:\n".join(action_list_copy)}]}]
print("action_list", action_list)
for action in action_list:
print("action", action)
action_copy = action.copy()
action_copy.pop('base64_image', None)
messages[0]["content"].append(
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{action['base64_image']}"}
}
)
response = run(
messages,
user_prompt=prompt)
return response
prompt = """
角色 你的角色是分析用户界面交互并为用于任务自动化的多模态大模型生成few-shot案例的专家
背景 我正在开发一个能够理解视觉UI元素并给出自动化步骤多模态推理的智能体为了训练或调整condition这个智能体我需要将记录下来的用户交互序列转换为清晰结构化的few-shot示例
目标 根据提供的用户指令动作序列包括事件类型步骤编号和相应截图生成一个简洁准确的few-shot示例这个示例应清晰地将用户的高级指令和视觉上下文映射到执行的低级动作使其适用于智能体的学习上下文
你将收到的输入
[{
'type':动作类型例如 'mouse', 'keyboard'
'event':具体事件例如 'left click', 'type', 'scroll down'
'step_number':动作的顺序编号,每一个动作都对应着一张图片
'text_buffer':如果是键盘动作则记录的是输入的文本缓冲内容
}]
"""
Generate Few-Shot examples from action list and user instruction
Args:
action_list: List of actions including screenshots
user_instruction: Optional user instruction or intent
Returns:
Formatted Few-Shot example as string
"""
action_list_copy = copy.deepcopy(action_list)
yield from self._process_utterance_based_sequence(action_list_copy, user_instruction)
def _process_utterance_based_sequence(self, mixed_sequence, user_instruction):
"""Process a sequence that contains both utterances and actions"""
from src.core.workflow_extractor import WorkflowExtractor
# Extract workflow segments based on utterances
extractor = WorkflowExtractor()
workflow_segments = extractor.extract_workflows(mixed_sequence)
# Process each workflow segment
results = []
for segment in workflow_segments:
intent = segment['intent']
actions = segment['actions']
# Skip segments with no actions
if not actions:
continue
# Prepare the prompt with the specific intent and overall user instruction
messages = [{"role": "user", "content":
[{"type": "text", "text": f"用户的总体目标是:{user_instruction}\n用户的当前意图是:{intent}\n动作序列如下。"}]}]
# Add images
for action in actions:
messages[0]["content"].append(
{
"type": "text",
"text": f"{str({k: v for k, v in action.items() if k != 'base64_image'})}"
}
)
messages[0]["content"].append(
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{action['base64_image']}"}
}
)
# Call the LLM for this segment
segment_response = run(messages, user_prompt=think_prompt)
# 提取思考过程
thinking_process = segment_response
# 立即输出思考过程
yield thinking_process
# 收集思考过程和意图而不是立即生成few-shot示例
results.append({
"intent": intent,
"thinking": thinking_process
})
# 准备所有思考过程的汇总
all_thinking_processes = "\n\n======= 分隔线 =======\n\n".join([
f"意图:{item['intent']}\n\n思考过程:\n{item['thinking']}"
for item in results
])
# 一次性生成所有few-shot示例同时传递用户总体目标
combined_messages = [{"role": "user", "content": [{"type": "text", "text": f"用户的总体目标是:{user_instruction}\n\n基于以下所有思考过程生成相应的few-shot示例集合确保示例不偏离用户总体目标\n\n{all_thinking_processes}"}]}]
all_few_shots = run(combined_messages, user_prompt=few_shot_prompt)
# 输出所有few-shot示例
yield all_few_shots
return
分析提供的typeevent并仔细检查图片中的视觉内容精确地按照以下格式生成一个连贯的few-shot示例
think_prompt = """
# 角色
你是一位顶级的用户界面交互分析专家擅长深度解读用户在视觉界面上的操作序列并从中推断用户的意图策略以及操作的有效性
```
**指令** [在此处插入准确的用户意图]
# 背景
我正在开发一个先进的多模态智能体目标是让它能理解并执行 GUI 上的任务为了让智能体学习人类的操作模式我需要分析真实用户是如何通过一系列界面交互来达成目标的这些原始操作序列往往包含探索错误修正和冗余步骤
**初始状态**
* [根据步骤1的图像简要描述与指令相关的初始屏幕状态提及关键可见元素]
# 目标
你的核心任务是**生成并输出一个详细的叙述性的思考过程**这个过程需要模拟你是如何分析给定的用户总体目标当前意图以及包含截图的操作序列的你需要在这个思考过程中阐述你是如何识别关键UI元素特别是鼠标交互点提炼用户真实意图过滤无效操作并最终理解核心操作步骤如何服务于用户目标的截图主要用于在分析时理解操作发生的具体上下文和交互对象
**演示动作序列**
1. **动作** `[标准化的动作类型例如 CLICK, TYPE, SCROLL, SELECT_TEXT]`
* **目标** `[描述此动作针对的具体UI元素参考其在对应图像中的外观或文本内容要精确例如1. 熟悉 C/C++开头的文本块标签为项目经历的按钮主内容区域的滚动条]`
* ** (如适用)** `[插入输入或选择的值]`
* *(基于步骤 [step_number] 的图像)*
2. **动作** `[标准化的动作类型]`
* **目标** `[描述此动作针对的具体UI元素]`
* ** (如适用)** `[插入值]`
* *(基于步骤 [step_number] 的图像)*
... 对Action_Sequence中的每一步重复
---
**最终状态可选但推荐**
* [根据最后一步动作后的图像描述结果状态表明任务完成或进入下一阶段]
### 输入信息(你将会收到以下信息)
1. **用户总体目标 (Overall Goal):** 用户想要最终完成的大任务
2. **用户当前意图 (Current Intent):** 用户在执行当前这段操作序列时想要直接达成的子目标或阶段性目标
3. **操作序列 (Action Sequence):** 一个按时间排序的操作列表每个操作包含类型位置如适用和对应的截图
生成时的关键注意事项
---
标准化动作 为动作使用一致的动词例如 CLICK, TYPE, SCROLL, DRAG, SELECT_TEXT
视觉定位 目标描述必须基于对应步骤图像中的视觉信息和任何提供的元素描述使其足够具体以便智能体能够定位
简洁性 信息要丰富但避免不必要的术语
准确性 确保生成的序列准确反映提供的Action_Sequence和视觉上下文
重点 突出与完成User_Instruction相关的交互点重点关注鼠标位置周围的情况不要关注其他无关的元素
### 任务指令
**请严格按照要求你的输出应该是一个单一的连贯的文本段落详细地描述你的完整思考过程不要使用项目符号编号列表或明显的章节标题让它读起来像一段自然流畅的内部思维独白或分析报告的草稿这个详细的思考过程描述将作为后续生成具体 Few-Shot 案例的基础**
**以下是引导你进行思考的叙述性框架请将你的分析融入这样的叙述中**
我的分析始于对用户目标的整体把握首先我会明确用户希望最终达成的**总体目标**[此处 mentally 插入总体目标]以及他们当前阶段声明的**意图**[此处 mentally 插入当前意图]理解这两者之间的关系至关重要我要判断当前的意图是否是实现总体目标的合理且必要的步骤例如如果总体目标是在线购买一件特定商品而当前意图是在搜索结果页筛选商品颜色那么这个意图显然服务于最终目标这有助于我将后续的操作分析锚定在正确的方向上避免偏离主题
接着我会仔细审视这个**当前意图**的精确性用户提供的意图有时可能比较笼统因此我会结合具体的**操作序列**来验证和细化它我会观察用户的实际动作他们点击了什么按钮在哪个输入框里打了字滚动了页面的哪个部分这些行为往往能揭示比声明更具体的意图比如如果意图是查看账户详情但操作序列显示用户点击了修改密码链接并开始输入那么我会将实际意图提炼为开始修改账户密码我会阐述我是如何基于[具体的操作细节如点击了某个按钮输入了特定文本]这些证据将初始意图修正或具体化为[提炼后的更精确意图]
在明确了更精确的用户意图后下一步是梳理整个**操作序列**识别并过滤掉**冗余或无效的操作**人类的操作常常不是最优路径可能包含重复点击打字错误后的修正无目的的页面滚动或短暂打开又关闭的窗口我会寻找这些模式比如用户可能在一个按钮上快速点击了两次但只有一次是必要的或者输入了一段文本然后用退格键删掉一部分再重新输入我会判断哪些操作对于达成刚才提炼出的精确意图并非必需并将它们从核心序列中剥离例如一系列的输入字符按退格键操作最终如果只是为了得到一个正确的单词我会将其合并为一次有效的输入[最终单词]操作并说明理由是之前的操作属于修正性质同样漫无目的的滚动或者点开菜单又立刻关闭的行为若与意图无直接关联也会被我视为干扰信息并加以忽略
最后在去除了干扰和冗余之后我会聚焦于**剩余的关键操作序列**对于这个精简后的序列中的每一步我会进行详尽的界面和操作分析我会明确指出**操作的类型**是点击输入滚动还是其他然后借助截图和上下文信息我会尽可能精确地描述被操作的**目标UI元素**它是一个标有登录的按钮吗还是一个带有搜索占位符的文本框或者是页面主要内容区域的滚动条我会记录下它的视觉特征文本标签或类型此外如果操作涉及具体**数值或内容**比如输入的文本选择的下拉选项滚动的方向我也会一并记录下来例如我会描述为用户点击了位于页面右上角的购物车图标按钮或者在标签为电子邮件地址的输入框中输入了文本example@email.com或者向下滚动了产品列表区域直到加载更多按钮可见通过这样对每一个关键步骤进行分解我就能清晰地构建出用户是如何通过与界面元素的有效交互来实现其特定意图的完整路径这整个连贯的思考和分析过程就构成了我对用户行为模式的深度理解
**请将你的实际分析内容按照上述思考流程和叙述风格整合成一个单一的文本段落作为输出**
"""
few_shot_prompt = """
# 任务: 生成 Few-Shot 示例用于智能体 System Prompt
# 背景:
你已完成对用户操作序列的深度分析并产出了一个详细的叙述性**思考过程**该思考过程明确了用户的总体目标提炼了具体的**操作目的 (精确意图)**并识别出了达成这些目的所必需的精简后的**关键动作序列**及其对应的**UI元素****最终状态**
# 目标:
基于你先前生成的**思考过程**结论为其中分析出的**每一个精确操作目的**生成一个结构化标准化的**Few-Shot 示例**这些示例将直接嵌入到多模态智能体的 **System Prompt** 作为核心指令的一部分指导其理解任务并模仿有效的操作模式因此生成的示例必须极其精确清晰具有普适性并严格遵循格式
# 输入假设:
此任务的唯一输入是你之前生成的**详细思考过程叙述文本**你将从中提取关键信息精确意图关键动作目标元素最终状态并进行格式化无需重新分析原始数据
# 输出格式要求:
请为思考过程中识别出的**每个操作目的**生成一个 JSON 对象格式的 Few-Shot 示例如果存在多个操作目的请将每个 JSON 对象用 `---` 分隔符清晰隔开
**每个 Few-Shot 示例必须严格遵循以下 JSON 结构:**
```json
{
"操作目的": "[从思考过程中提取的、已提炼的精确用户意图]",
"演示动作序列": [
{
"动作": "[标准化的动作类型 (例如: CLICK, TYPE, SCROLL_DOWN, SCROLL_UP, SELECT_OPTION, HOVER, DRAG_DROP, PRESS_ENTER, PRESS_TAB)]",
"目标": "[对UI元素的精确、可定位描述 (应包含文本标签、元素类型(如 button, input, link, checkbox, dropdown), aria-label, 或其他显著视觉/结构特征,确保智能体能大概率识别)]",
"": "[动作相关的具体值 (例如: TYPE 的文本内容, SELECT_OPTION 的选项文本, PRESS_KEY 的键名), 若无则省略此键]"
},
// ... 为该操作目的的关键非冗余动作序列中的每一步重复此对象 ...
{
"动作": "[最后一个关键动作类型]",
"目标": "[最后一个目标的精确描述]",
"": "[最后一个动作的值,如适用]"
}
],
"最终状态": "[描述在完成此'操作目的'后,界面上可直接观察到的、明确的结果或状态变化 (例如: '用户成功登录并跳转到个人主页', '商品列表已根据价格筛选并更新显示', '表单提交成功,页面显示确认信息')]"
}
```
--- [如果分析了多个操作目的请在此处使用分隔符然后开始下一个 JSON 对象]
生成关键注意事项与质量标准:
1. 忠于思考过程: 所有字段内容操作目的动作目标最终状态必须直接来源于或准确对应于你先前思考过程的结论
2. 动作标准化: 动作 字段必须使用预定义且一致的动作类型参考格式中的示例这对于智能体解析指令至关重要
3. 目标可定位性: 目标 描述是关键它需要足够丰富和具体以便智能体能够在不同的屏幕分辨率或微小布局变动下通过视觉识别和 DOM 结构分析如果可用可靠地定位到正确的UI元素优先使用稳定的标识符如明确的文本标签aria-label辅以元素类型和必要的上下文
4. 序列精炼: 演示动作序列 必须只包含达成 操作目的 的核心非冗余步骤正如在思考过程中提炼的那样
5. 状态明确: 最终状态 需要清晰描述与 操作目的 直接相关的可验证的界面变化结果
6. JSON 格式严格: 输出必须是有效的 JSON 格式每个示例一个 JSON 对象并使用 --- 分隔符
7. System Prompt 适用性: 产出的每一个示例都应被视为给智能体的直接指令或学习样本因此必须是高质量无歧义的
请基于你已有的思考过程分析结果立即开始生成符合上述所有要求的 Few-Shot 示例 JSON 对象
"""

View File

@ -0,0 +1,195 @@
"""
Recorder manager module for coordinating input and voice recording
"""
import os
import time
import json
import traceback
from PyQt6.QtCore import QObject, pyqtSignal, pyqtSlot, QTimer
from src.core.input_listener import InputListener
from src.core.voice_recorder import VoiceRecorder
class RecorderManager(QObject):
"""
Manages and coordinates the recording of user inputs (keyboard/mouse)
and voice data, producing a synchronized mixed sequence
"""
recording_started = pyqtSignal()
recording_stopped = pyqtSignal()
sequence_updated = pyqtSignal(list)
status_changed = pyqtSignal(str)
def __init__(self):
"""Initialize recorder manager"""
super().__init__()
# Store the sequence
self.mixed_sequence = []
self.is_recording = False
# Create input listener
self.input_listener = InputListener()
self.input_listener.action_detected.connect(self.on_action_detected)
try:
# Create voice recorder
self.voice_recorder = VoiceRecorder()
self.voice_recorder.utterance_detected.connect(self.on_utterance_detected)
self.has_voice_recorder = True
except Exception as e:
self.status_changed.emit(f'警告:无法初始化语音录制器,将只记录键盘鼠标事件: {str(e)}')
print(f"Error initializing voice recorder: {e}")
traceback.print_exc()
self.has_voice_recorder = False
@pyqtSlot()
def start_recording(self):
"""Start recording both inputs and voice"""
if self.is_recording:
return
self.is_recording = True
self.mixed_sequence = []
# Start input listener
try:
self.input_listener.start_listen()
except Exception as e:
self.status_changed.emit(f'启动输入监听失败: {str(e)}')
print(f"Error starting input listener: {e}")
# Start voice recorder if available
if self.has_voice_recorder:
try:
self.voice_recorder.start_recording()
except Exception as e:
self.status_changed.emit(f'启动语音录制失败: {str(e)}')
print(f"Error starting voice recorder: {e}")
self.recording_started.emit()
@pyqtSlot()
def stop_recording(self):
"""Stop all recording activities"""
if not self.is_recording:
return
self.is_recording = False
# Stop input listener
try:
self.input_listener.stop_listen()
except Exception as e:
print(f"Error stopping input listener: {e}")
# Stop voice recorder if available
if self.has_voice_recorder:
try:
self.voice_recorder.stop_recording()
except Exception as e:
print(f"Error stopping voice recorder: {e}")
self.recording_stopped.emit()
@pyqtSlot(dict)
def on_action_detected(self, action_data):
"""
Handle detected input actions
Args:
action_data: Dictionary containing action details
"""
if not self.is_recording:
return
# Format into mixed sequence entry
action_entry = {
"type": "action",
"timestamp": time.time(),
"event": action_data["event"]
}
# Add position for mouse events
if action_data.get("position"):
action_entry["position"] = action_data["position"]
# Extract target information (would need additional image processing)
# For now, we're just storing the raw event data
action_entry["screenshot"] = action_data.get("base64_image", "")
# Add to sequence
self.mixed_sequence.append(action_entry)
self.sequence_updated.emit(self.mixed_sequence)
@pyqtSlot(dict)
def on_utterance_detected(self, utterance_data):
"""
Handle detected utterances
Args:
utterance_data: Dictionary containing utterance details
"""
if not self.is_recording:
return
# Add to sequence
self.mixed_sequence.append(utterance_data)
self.sequence_updated.emit(self.mixed_sequence)
self.status_changed.emit(f'已检测到语音: {utterance_data.get("text", "")}')
def save_sequence(self, filename):
"""
Save the recorded mixed sequence to a JSON file
Args:
filename: Output filename
Returns:
bool: True if saved successfully
"""
if not self.mixed_sequence:
return False
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.mixed_sequence, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
print(f"Error saving sequence: {e}")
return False
def load_sequence(self, filename):
"""
Load a previously saved mixed sequence
Args:
filename: Input filename
Returns:
bool: True if loaded successfully
"""
if not os.path.exists(filename):
return False
try:
with open(filename, 'r', encoding='utf-8') as f:
self.mixed_sequence = json.load(f)
self.sequence_updated.emit(self.mixed_sequence)
return True
except Exception as e:
print(f"Error loading sequence: {e}")
return False
def get_workflow_segments(self):
"""
Extract workflow segments from the mixed sequence
Returns:
List of (utterance, actions) pairs
"""
from src.core.workflow_extractor import WorkflowExtractor
extractor = WorkflowExtractor()
return extractor.extract_workflows(self.mixed_sequence)

291
src/core/voice_recorder.py Normal file
View File

@ -0,0 +1,291 @@
"""
Voice recording and speech-to-text module
"""
import time
import threading
import numpy as np
import pyaudio
import wave
import os
import tempfile
from pydub import AudioSegment
import speech_recognition as sr
import webrtcvad
from PyQt6.QtCore import QObject, pyqtSignal, pyqtSlot
from aip import AipSpeech
class VoiceRecorder(QObject):
"""
Records audio and performs speech-to-text conversion
Uses WebRTC VAD for voice activity detection
"""
utterance_detected = pyqtSignal(dict)
recording_finished = pyqtSignal()
def __init__(self, sample_rate=16000, chunk_size=320, vad_mode=3):
"""
Initialize the voice recorder
Args:
sample_rate: Audio sample rate (WebRTC VAD only supports 8000, 16000, 32000, 48000 Hz)
chunk_size: Audio chunk size (WebRTC VAD requires 10, 20, or 30 ms chunks)
vad_mode: WebRTC VAD aggressiveness mode (0-3)
"""
super().__init__()
self.sample_rate = sample_rate
self.chunk_size = chunk_size # 20ms at 16kHz
self.format = pyaudio.paInt16
self.channels = 1
# WebRTC Voice Activity Detection
self.vad = webrtcvad.Vad(vad_mode)
# Audio recording variables
self.audio = pyaudio.PyAudio()
self.stream = None
self.frames = []
# Speech recognition
self.recognizer = sr.Recognizer()
self.recognizer.energy_threshold = 300 # Adjust based on environment
self.offline_mode = False
self.speech_counter = 0
# Control flags
self.is_recording = False
self.recording_thread = None
# Create app temp directory
self.temp_dir = os.path.join(tempfile.gettempdir(), "automate_voice")
os.makedirs(self.temp_dir, exist_ok=True)
print(f"Using temp directory: {self.temp_dir}")
# 从环境变量中加载百度语音API凭证
app_id = os.environ.get('BAIDU_APP_ID')
api_key = os.environ.get('BAIDU_API_KEY')
secret_key = os.environ.get('BAIDU_SECRET_KEY')
if app_id and api_key and secret_key:
self.baidu_client = AipSpeech(app_id, api_key, secret_key)
print("Baidu speech recognition initialized")
else:
print("Warning: Baidu API credentials not found in environment variables")
print("Please set BAIDU_APP_ID, BAIDU_API_KEY, and BAIDU_SECRET_KEY")
self.baidu_client = None
self.offline_mode = True
def start_recording(self):
"""Start audio recording in a separate thread"""
if self.is_recording:
return
self.is_recording = True
self.frames = []
self.speech_counter = 0
try:
# Open audio stream
self.stream = self.audio.open(
format=self.format,
channels=self.channels,
rate=self.sample_rate,
input=True,
frames_per_buffer=self.chunk_size
)
# Start recording thread
self.recording_thread = threading.Thread(target=self._record)
self.recording_thread.daemon = True
self.recording_thread.start()
except Exception as e:
print(f"Error starting recording: {e}")
self.is_recording = False
def _record(self):
"""Recording process that detects speech and performs STT"""
# Variables for speech detection
speech_frames = []
silence_counter = 0
speech_detected = False
speech_end_time = 0
try:
while self.is_recording:
# Read audio chunk
audio_chunk = self.stream.read(self.chunk_size, exception_on_overflow=False)
self.frames.append(audio_chunk)
# Check for voice activity
try:
is_speech = self.vad.is_speech(audio_chunk, self.sample_rate)
except Exception as e:
print(f"VAD error: {e}")
is_speech = False
if is_speech:
# Reset silence counter when speech is detected
silence_counter = 0
if not speech_detected:
# Mark beginning of speech
speech_detected = True
speech_frames = []
# Collect speech frames
speech_frames.append(audio_chunk)
else:
if speech_detected:
silence_counter += 1
speech_frames.append(audio_chunk)
# Consider speech ended after 1.5 seconds of silence (75 frames at 20ms per frame)
if silence_counter > 75:
speech_detected = False
silence_counter = 0
speech_end_time = time.time()
# Process the speech for transcription
self._process_speech(speech_frames)
speech_frames = []
except Exception as e:
print(f"Recording error: {e}")
finally:
if self.stream:
try:
self.stream.stop_stream()
self.stream.close()
except Exception as e:
print(f"Error closing stream: {e}")
def _process_speech(self, speech_frames):
"""
Process recorded speech frames to extract text
Args:
speech_frames: List of audio frames containing speech
"""
if not speech_frames:
return
self.speech_counter += 1
# Save speech to temporary WAV file
temp_file = os.path.join(self.temp_dir, f"speech_{self.speech_counter}.wav")
try:
wf = wave.open(temp_file, 'wb')
wf.setnchannels(self.channels)
wf.setsampwidth(self.audio.get_sample_size(self.format))
wf.setframerate(self.sample_rate)
wf.writeframes(b''.join(speech_frames))
wf.close()
# Convert to AudioSegment for potential preprocessing
audio_segment = AudioSegment.from_wav(temp_file)
if not self.offline_mode and self.baidu_client:
try:
# 使用百度语音识别
with open(temp_file, 'rb') as f:
audio_data = f.read()
# 调用百度语音API进行识别
result = self.baidu_client.asr(audio_data, 'wav', self.sample_rate, {
'dev_pid': 1537, # 普通话(有标点)
})
if result['err_no'] == 0:
text = result['result'][0]
print(f"Recognized text: {text}")
else:
print(f"Baidu ASR error: {result['err_msg']}")
raise Exception(f"Baidu ASR error: {result['err_msg']}")
except (Exception, ConnectionError) as e:
print(f"Online speech recognition failed: {e}")
print("Switching to offline mode")
self.offline_mode = True
text = self._offline_speech_recognition(temp_file)
else:
# Offline mode or no Baidu client
text = self._offline_speech_recognition(temp_file)
if text:
# Emit the detected utterance with timestamp
self.utterance_detected.emit({
"type": "utterance",
"timestamp": time.time(),
"text": text
})
except Exception as e:
print(f"Speech recognition error: {e}")
# Still emit an utterance with placeholder text in case of error
self.utterance_detected.emit({
"type": "utterance",
"timestamp": time.time(),
"text": f"[语音识别失败 #{self.speech_counter}]"
})
def _offline_speech_recognition(self, audio_file):
"""
Perform offline speech recognition (fallback when online fails)
Args:
audio_file: Path to audio file
Returns:
str: Recognized text or empty string
"""
# For now, we'll just use a placeholder text since we don't have a real offline STT engine
# In a real implementation, you'd use something like Vosk or Whisper for offline recognition
return f"语音识别离线模式 #{self.speech_counter}"
def stop_recording(self):
"""Stop the audio recording"""
if not self.is_recording:
return
self.is_recording = False
if self.recording_thread:
self.recording_thread.join(timeout=2.0)
if self.stream:
try:
self.stream.stop_stream()
self.stream.close()
self.stream = None
except Exception as e:
print(f"Error stopping recording: {e}")
self.recording_finished.emit()
def save_recording(self, filename):
"""
Save the full recording to a WAV file
Args:
filename: Output filename
"""
if not self.frames:
return False
try:
wf = wave.open(filename, 'wb')
wf.setnchannels(self.channels)
wf.setsampwidth(self.audio.get_sample_size(self.format))
wf.setframerate(self.sample_rate)
wf.writeframes(b''.join(self.frames))
wf.close()
return True
except Exception as e:
print(f"Error saving recording: {e}")
return False
def __del__(self):
"""Clean up resources"""
self.stop_recording()
try:
self.audio.terminate()
except Exception:
pass

View File

@ -0,0 +1,141 @@
"""
Workflow extraction module that processes mixed sequences of utterances and actions
to generate intent-based workflow segments
"""
class WorkflowExtractor:
"""
Extracts workflows from mixed sequences of utterances and actions
by segmenting based on utterance boundaries
"""
def __init__(self):
"""Initialize workflow extractor"""
pass
def process_mixed_sequence(self, sequence):
"""
Process a mixed sequence of utterances and actions
Args:
sequence: List of dictionaries containing utterances and actions
Each item must have 'type' ('utterance' or 'action') and 'timestamp'
Returns:
List of (intent, actions) tuples representing workflow segments
"""
if not sequence:
return []
# Sort by timestamp to ensure proper ordering
sorted_sequence = sorted(sequence, key=lambda x: x['timestamp'])
# First pass: Merge consecutive utterances with no actions in between
merged_sequence = []
i = 0
while i < len(sorted_sequence):
current_item = sorted_sequence[i].copy() # Create a copy to avoid modifying original
# If current item is an utterance, check if we can merge with next utterances
if current_item['type'] == 'utterance':
merged_text = current_item['text']
next_i = i + 1
# Look ahead for the next items
has_actions_between = False
last_utterance_idx = i
while next_i < len(sorted_sequence):
next_item = sorted_sequence[next_i]
# If we found an action, mark that there are actions between utterances
if next_item['type'] == 'action':
has_actions_between = True
# If we found another utterance
elif next_item['type'] == 'utterance':
# If there were no actions between, merge this utterance
if not has_actions_between:
merged_text += " " + next_item['text']
last_utterance_idx = next_i
else:
# There were actions between, stop looking
break
next_i += 1
# Update the merged text in the current utterance if we found utterances to merge
if last_utterance_idx > i:
current_item['text'] = merged_text
# Skip the merged utterances in the next iteration
i = last_utterance_idx
merged_sequence.append(current_item)
i += 1
# Find all utterance indices in the merged sequence
utterance_indices = [
i for i, item in enumerate(merged_sequence)
if item['type'] == 'utterance'
]
if not utterance_indices:
# No utterances found, return empty result
return []
# Extract workflow segments based on utterance boundaries
segments = []
for i in range(len(utterance_indices)):
# Current utterance index
curr_idx = utterance_indices[i]
# Extract the utterance
utterance = merged_sequence[curr_idx]
# Determine segment end (next utterance or end of sequence)
next_idx = utterance_indices[i+1] if i+1 < len(utterance_indices) else len(merged_sequence)
# Extract the actions between this utterance and the next
actions = [
merged_sequence[j] for j in range(curr_idx+1, next_idx)
if merged_sequence[j]['type'] == 'action'
]
# Add the segment to the result
segments.append((utterance, actions))
return segments
def format_sequence_for_analysis(self, utterance, actions):
"""
Format a (utterance, actions) pair for LLM analysis
Args:
utterance: The utterance dictionary
actions: List of action dictionaries
Returns:
Dictionary with formatted utterance and actions
"""
return {
"intent": utterance.get('text', ''),
"utterance": utterance,
"actions": actions
}
def extract_workflows(self, mixed_sequence):
"""
Extract workflows from a mixed sequence and format for analysis
Args:
mixed_sequence: List of dictionaries containing utterances and actions
Returns:
List of formatted workflow segments ready for LLM analysis
"""
segments = self.process_mixed_sequence(mixed_sequence)
return [
self.format_sequence_for_analysis(utterance, actions)
for utterance, actions in segments
]

View File

@ -1,7 +1,10 @@
"""
Main application window for the AutoMate interface
"""
from PyQt6.QtWidgets import QMainWindow, QWidget, QHBoxLayout, QVBoxLayout, QApplication
from PyQt6.QtWidgets import (
QMainWindow, QWidget, QHBoxLayout, QVBoxLayout, QApplication
)
from PyQt6.QtCore import Qt
from src.ui.chat_area import ChatArea
from src.ui.input_area import InputArea
from src.ui.profile_widget import ProfileWidget
@ -32,6 +35,9 @@ class MainWindow(QMainWindow):
y = (screen.height() - window_size.height()) // 2
self.move(x, y)
# Create mini window for demonstration mode first
self.mini_window = MiniWindow(self.finish_demonstration)
# Create central widget
central_widget = QWidget()
self.setCentralWidget(central_widget)
@ -56,9 +62,6 @@ class MainWindow(QMainWindow):
# Create chat area
self.chat_area = ChatArea()
# Create mini window for demonstration mode
self.mini_window = MiniWindow(self.finish_demonstration)
# Create conversation manager
self.conversation_manager = ConversationManager(self.chat_area, self.mini_window)
@ -81,4 +84,4 @@ class MainWindow(QMainWindow):
def finish_demonstration(self):
"""Finish demonstration callback for mini window"""
self.conversation_manager.finish_demonstration()
self.conversation_manager.finish_demonstration()

View File

@ -1,63 +1,66 @@
"""
Mini window component for task demonstration mode
Mini window module for displaying a small control window during demonstrations
"""
from PyQt6.QtWidgets import (QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QLabel, QPushButton, QApplication)
from PyQt6.QtCore import Qt
from PyQt6.QtGui import QFont
from PyQt6.QtWidgets import (
QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QLabel, QPushButton, QFrame
)
from PyQt6.QtCore import Qt, QPoint
from PyQt6.QtGui import QFont, QCursor, QPixmap
class MiniWindow(QMainWindow):
"""
Small floating window displayed during task demonstration
Mini floating window displayed during demonstration recording
Provides status information and finish button
"""
def __init__(self, finish_callback, parent=None):
"""
Initialize the mini window
Args:
finish_callback: Function to call when demonstration is finished
finish_callback: Function to call when finish button is clicked
parent: Parent widget
"""
super().__init__(parent)
self.setWindowTitle("Learning Mode")
self.setFixedSize(250, 150)
super().__init__(parent, Qt.WindowType.FramelessWindowHint | Qt.WindowType.WindowStaysOnTopHint)
# Position in bottom-right corner
desktop = QApplication.primaryScreen().availableGeometry()
self.move(desktop.width() - 270, desktop.height() - 170)
self.setWindowTitle("Recording")
self.setStyleSheet("background-color: white;")
# Set frameless and always-on-top flags
self.setWindowFlags(
Qt.WindowType.FramelessWindowHint |
Qt.WindowType.WindowStaysOnTopHint
)
# Set small window size
self.resize(300, 150)
# Set window style
self.setStyleSheet("""
QMainWindow {
background-color: #fff8f8;
border: 2px solid #ffcdd2;
border-radius: 10px;
}
""")
# Position in bottom right corner
screen_geometry = self.screen().geometry()
self.move(screen_geometry.width() - 320, screen_geometry.height() - 270)
# Create central widget
mini_central = QWidget()
self.setCentralWidget(mini_central)
# For window dragging
self.dragging = False
self.offset = QPoint()
# Create layout
mini_layout = QVBoxLayout(mini_central)
# Central widget
central_widget = QWidget()
self.setCentralWidget(central_widget)
# Create header with avatar and title
mini_header = QWidget()
# Main layout
mini_layout = QVBoxLayout(central_widget)
mini_layout.setContentsMargins(10, 10, 10, 10)
mini_layout.setSpacing(10)
# 创建标题部分
mini_header = QFrame()
mini_header.setFrameShape(QFrame.Shape.NoFrame)
mini_header.setStyleSheet("background-color: #f5f5f5; border-radius: 8px;")
header_layout = QHBoxLayout(mini_header)
header_layout.setContentsMargins(10, 5, 10, 5)
# Avatar placeholder
self.mini_avatar = QLabel()
# Avatar will be set from the main window
self.mini_avatar.setFixedSize(30, 30)
self.mini_avatar.setStyleSheet("background-color: #e0e0e0; border-radius: 15px;")
header_layout.addWidget(self.mini_avatar)
mini_title = QLabel("Learning in progress...")
# Title
mini_title = QLabel("Recording Demo")
mini_title.setFont(QFont("Arial", 10, QFont.Weight.Bold))
mini_title.setStyleSheet("color: #d32f2f;")
header_layout.addWidget(mini_title)
@ -70,6 +73,12 @@ class MiniWindow(QMainWindow):
self.status_label.setFont(QFont("Arial", 10))
self.status_label.setStyleSheet("color: #333333; margin: 10px;")
# 新增:语音状态显示
self.voice_status_label = QLabel("Voice Recording: Ready")
self.voice_status_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.voice_status_label.setFont(QFont("Arial", 9))
self.voice_status_label.setStyleSheet("color: #1976d2; margin: 0px 10px;")
# Finish button
finish_button = QPushButton("Finish Demo")
finish_button.setFont(QFont("Arial", 10, QFont.Weight.Bold))
@ -94,6 +103,7 @@ class MiniWindow(QMainWindow):
# Add to layout
mini_layout.addWidget(mini_header)
mini_layout.addWidget(self.status_label)
mini_layout.addWidget(self.voice_status_label) # 添加语音状态标签
mini_layout.addWidget(finish_button)
def set_avatar(self, avatar_pixmap):
@ -106,4 +116,30 @@ class MiniWindow(QMainWindow):
scaled_avatar = avatar_pixmap.scaled(30, 30, Qt.AspectRatioMode.KeepAspectRatio,
Qt.TransformationMode.SmoothTransformation)
self.mini_avatar.setPixmap(scaled_avatar)
self.mini_avatar.setFixedSize(30, 30)
self.mini_avatar.setFixedSize(30, 30)
def set_voice_status(self, status):
"""
设置语音状态文本
Args:
status: 语音状态文本
"""
self.voice_status_label.setText(f"Voice: {status}")
# Mouse event handling for window dragging
def mousePressEvent(self, event):
"""Handle mouse press events for dragging"""
if event.button() == Qt.MouseButton.LeftButton:
self.dragging = True
self.offset = event.position().toPoint()
def mouseMoveEvent(self, event):
"""Handle mouse move events for dragging"""
if self.dragging:
self.move(self.mapToGlobal(event.position().toPoint() - self.offset))
def mouseReleaseEvent(self, event):
"""Handle mouse release events for dragging"""
if event.button() == Qt.MouseButton.LeftButton:
self.dragging = False

358
src/ui/recorder_panel.py Normal file
View File

@ -0,0 +1,358 @@
"""
Recorder panel UI for controlling voice and input recording
"""
import os
from PyQt6.QtWidgets import (
QWidget, QVBoxLayout, QHBoxLayout, QPushButton,
QLabel, QFileDialog, QListWidget, QListWidgetItem,
QSplitter, QTextEdit, QMessageBox, QStatusBar
)
from PyQt6.QtCore import Qt, pyqtSlot, QSize
from PyQt6.QtGui import QFont, QColor
from src.core.recorder_manager import RecorderManager
class RecorderPanel(QWidget):
"""
Panel for recording user actions and voice, and visualizing
the extracted workflows
"""
def __init__(self, parent=None):
"""Initialize recorder panel"""
super().__init__(parent)
self.recorder_manager = RecorderManager()
# Connect signals
self.recorder_manager.recording_started.connect(self.on_recording_started)
self.recorder_manager.recording_stopped.connect(self.on_recording_stopped)
self.recorder_manager.sequence_updated.connect(self.on_sequence_updated)
self.recorder_manager.status_changed.connect(self.on_status_changed)
self.init_ui()
def init_ui(self):
"""Initialize the user interface"""
main_layout = QVBoxLayout(self)
# Control buttons
control_layout = QHBoxLayout()
self.record_button = QPushButton("开始录制")
self.record_button.clicked.connect(self.toggle_recording)
self.record_button.setStyleSheet("""
QPushButton {
background-color: #4CAF50;
color: white;
border-radius: 4px;
padding: 8px 16px;
font-weight: bold;
}
QPushButton:hover {
background-color: #45a049;
}
QPushButton:pressed {
background-color: #3d8b40;
}
""")
self.save_button = QPushButton("保存录制")
self.save_button.clicked.connect(self.save_recording)
self.save_button.setEnabled(False)
self.load_button = QPushButton("加载录制")
self.load_button.clicked.connect(self.load_recording)
self.analyze_button = QPushButton("分析工作流")
self.analyze_button.clicked.connect(self.analyze_workflow)
self.analyze_button.setEnabled(False)
control_layout.addWidget(self.record_button)
control_layout.addWidget(self.save_button)
control_layout.addWidget(self.load_button)
control_layout.addWidget(self.analyze_button)
main_layout.addLayout(control_layout)
# Status label
self.status_label = QLabel("就绪")
self.status_label.setStyleSheet("font-weight: bold; padding: 5px; color: #333;")
main_layout.addWidget(self.status_label)
# Create splitter for sequence and workflow views
splitter = QSplitter(Qt.Orientation.Horizontal)
splitter.setHandleWidth(1)
splitter.setStyleSheet("""
QSplitter::handle {
background-color: #dddddd;
}
""")
# Mixed sequence list
sequence_container = QWidget()
sequence_layout = QVBoxLayout(sequence_container)
sequence_layout.setContentsMargins(0, 0, 0, 0)
sequence_header = QLabel("录制序列")
sequence_header.setStyleSheet("font-weight: bold; font-size: 14px; padding: 5px; background-color: #f5f5f5;")
sequence_layout.addWidget(sequence_header)
self.sequence_list = QListWidget()
self.sequence_list.setMinimumWidth(400)
self.sequence_list.setAlternatingRowColors(True)
self.sequence_list.setStyleSheet("""
QListWidget {
border: 1px solid #dddddd;
border-radius: 4px;
padding: 2px;
background-color: white;
}
QListWidget::item {
padding: 4px;
border-bottom: 1px solid #eeeeee;
}
QListWidget::item:alternate {
background-color: #f9f9f9;
}
""")
sequence_layout.addWidget(self.sequence_list)
# Workflow results view
workflow_container = QWidget()
workflow_layout = QVBoxLayout(workflow_container)
workflow_layout.setContentsMargins(0, 0, 0, 0)
workflow_header = QLabel("工作流分析")
workflow_header.setStyleSheet("font-weight: bold; font-size: 14px; padding: 5px; background-color: #f5f5f5;")
workflow_layout.addWidget(workflow_header)
self.workflow_text = QTextEdit()
self.workflow_text.setReadOnly(True)
self.workflow_text.setStyleSheet("""
QTextEdit {
border: 1px solid #dddddd;
border-radius: 4px;
padding: 8px;
background-color: white;
font-family: Arial, sans-serif;
}
""")
workflow_layout.addWidget(self.workflow_text)
# Add containers to splitter
splitter.addWidget(sequence_container)
splitter.addWidget(workflow_container)
# Set splitter proportions
splitter.setSizes([400, 400])
main_layout.addWidget(splitter, 1) # 1 = stretch factor
# Status bar for detailed status
self.status_bar = QStatusBar()
self.status_bar.setSizeGripEnabled(False)
self.status_bar.setStyleSheet("""
QStatusBar {
background-color: #f5f5f5;
color: #333333;
border-top: 1px solid #dddddd;
}
""")
main_layout.addWidget(self.status_bar)
# Set layout
self.setLayout(main_layout)
self.setMinimumSize(800, 600)
# Initial status
self.status_bar.showMessage('系统就绪,点击"开始录制"按钮开始捕获键盘鼠标和语音')
def toggle_recording(self):
"""Toggle recording state"""
if not self.recorder_manager.is_recording:
self.recorder_manager.start_recording()
else:
self.recorder_manager.stop_recording()
@pyqtSlot()
def on_recording_started(self):
"""Handle recording started event"""
self.record_button.setText("停止录制")
self.record_button.setStyleSheet("""
QPushButton {
background-color: #f44336;
color: white;
border-radius: 4px;
padding: 8px 16px;
font-weight: bold;
}
QPushButton:hover {
background-color: #e53935;
}
QPushButton:pressed {
background-color: #d32f2f;
}
""")
self.save_button.setEnabled(False)
self.load_button.setEnabled(False)
self.analyze_button.setEnabled(False)
self.status_label.setText("正在录制中...")
self.status_bar.showMessage('正在录制,系统将捕获您的键盘鼠标操作和语音指令...')
self.sequence_list.clear()
self.workflow_text.clear()
@pyqtSlot()
def on_recording_stopped(self):
"""Handle recording stopped event"""
self.record_button.setText("开始录制")
self.record_button.setStyleSheet("""
QPushButton {
background-color: #4CAF50;
color: white;
border-radius: 4px;
padding: 8px 16px;
font-weight: bold;
}
QPushButton:hover {
background-color: #45a049;
}
QPushButton:pressed {
background-color: #3d8b40;
}
""")
self.save_button.setEnabled(True)
self.load_button.setEnabled(True)
self.analyze_button.setEnabled(True)
self.status_label.setText("录制完成")
self.status_bar.showMessage('录制已完成,您可以保存或分析工作流')
@pyqtSlot(list)
def on_sequence_updated(self, sequence):
"""
Update the sequence list when new events are recorded
Args:
sequence: The updated mixed sequence
"""
self.sequence_list.clear()
for item in sequence:
item_type = item.get("type", "")
timestamp = item.get("timestamp", 0)
if item_type == "utterance":
text = item.get("text", "")
list_item = QListWidgetItem(f"[语音] {text}")
list_item.setBackground(QColor("#e8f5e9")) # Light green background
list_item.setForeground(QColor("#2e7d32")) # Dark green text
font = list_item.font()
font.setBold(True)
list_item.setFont(font)
elif item_type == "action":
event = item.get("event", "")
position = item.get("position", None)
if position:
position_text = f" @ ({position[0]}, {position[1]})"
else:
position_text = ""
list_item = QListWidgetItem(f"[动作] {event}{position_text}")
else:
continue
self.sequence_list.addItem(list_item)
# Scroll to bottom
self.sequence_list.scrollToBottom()
# Update status
self.status_bar.showMessage(f'已录制 {len(sequence)} 个事件')
@pyqtSlot(str)
def on_status_changed(self, status):
"""
Update status when it changes
Args:
status: New status message
"""
self.status_bar.showMessage(status, 5000) # Show for 5 seconds
def save_recording(self):
"""Save the current recording to a file"""
if not self.recorder_manager.mixed_sequence:
QMessageBox.warning(self, "警告", "没有可保存的录制数据")
return
filename, _ = QFileDialog.getSaveFileName(
self, "保存录制", "", "JSON Files (*.json)"
)
if filename:
success = self.recorder_manager.save_sequence(filename)
if success:
self.status_label.setText(f"已保存到 {filename}")
self.status_bar.showMessage(f'成功保存录制到: {filename}')
else:
QMessageBox.critical(self, "错误", "保存录制失败")
def load_recording(self):
"""Load a recording from a file"""
filename, _ = QFileDialog.getOpenFileName(
self, "加载录制", "", "JSON Files (*.json)"
)
if filename:
success = self.recorder_manager.load_sequence(filename)
if success:
self.status_label.setText(f"已加载 {filename}")
self.status_bar.showMessage(f'成功加载录制: {filename}')
self.analyze_button.setEnabled(True)
self.save_button.setEnabled(True)
else:
QMessageBox.critical(self, "错误", "加载录制失败")
def analyze_workflow(self):
"""Analyze the current recording and extract workflows"""
if not self.recorder_manager.mixed_sequence:
QMessageBox.warning(self, "警告", "没有可分析的录制数据")
return
self.status_bar.showMessage('正在分析工作流...')
try:
workflows = self.recorder_manager.get_workflow_segments()
if not workflows:
self.workflow_text.setText("未找到工作流片段")
self.status_bar.showMessage('分析完成,未找到工作流片段')
return
# Format and display workflows
result_text = "# 提取的工作流\n\n"
for i, workflow in enumerate(workflows):
intent = workflow.get("intent", "")
actions = workflow.get("actions", [])
result_text += f"## 片段 {i+1}: \"{intent}\"\n\n"
if actions:
result_text += "操作序列:\n"
for j, action in enumerate(actions):
event = action.get("event", "")
result_text += f"{j+1}. {event}\n"
else:
result_text += "没有相关的操作\n"
result_text += "\n---\n\n"
self.workflow_text.setText(result_text)
self.status_bar.showMessage(f'工作流分析完成,找到 {len(workflows)} 个片段')
except Exception as e:
self.status_bar.showMessage(f'分析工作流时出错: {str(e)}')
QMessageBox.critical(self, "错误", f'分析工作流失败: {str(e)}')

1
src/ui/streaming_chat.py Normal file
View File

@ -0,0 +1 @@

298
src/utils/audio_recorder.py Normal file
View File

@ -0,0 +1,298 @@
"""
Audio recording and speech recognition module
"""
import os
import time
import wave
import threading
import tempfile
import queue
import numpy as np
import pyaudio
import webrtcvad
import speech_recognition as sr
from PyQt6.QtCore import QObject, pyqtSignal
from aip import AipSpeech
class AudioRecorder(QObject):
"""
Class for recording audio and performing speech recognition
Emits signals when utterances are detected
"""
utterance_detected = pyqtSignal(dict)
recording_status = pyqtSignal(str)
terminated = pyqtSignal()
def __init__(self, vad_level=3, sample_rate=16000, chunk_duration_ms=30):
"""
Initialize audio recorder with VAD and ASR
Args:
vad_level: VAD aggressiveness (0-3)
sample_rate: Audio sample rate (Hz)
chunk_duration_ms: Chunk duration (ms)
"""
super().__init__()
self.sample_rate = sample_rate
self.chunk_duration_ms = chunk_duration_ms
self.chunk_size = int(self.sample_rate * self.chunk_duration_ms / 1000)
self.vad = webrtcvad.Vad(vad_level)
# Audio recording setup
self.audio_format = pyaudio.paInt16
self.channels = 1
self.recording = False
# Initialize recognizer
self.recognizer = sr.Recognizer()
self.recognizer.energy_threshold = 300 # Adjust based on environment
# Threading objects
self.recording_thread = None
self.transcription_thread = None
self.audio_queue = queue.Queue()
self.stop_event = threading.Event()
# Temporary directories for audio files
self.temp_dir = tempfile.mkdtemp()
self.current_audio_file = None
self.active_audio_file = None
# 初始化百度语音客户端
app_id = os.environ.get('BAIDU_APP_ID')
api_key = os.environ.get('BAIDU_API_KEY')
secret_key = os.environ.get('BAIDU_SECRET_KEY')
if app_id and api_key and secret_key:
self.baidu_client = AipSpeech(app_id, api_key, secret_key)
print("AudioRecorder: Baidu speech recognition initialized")
self.use_baidu = True
else:
print("AudioRecorder: Warning - Baidu API credentials not found in environment variables")
self.baidu_client = None
self.use_baidu = False
def _recording_worker(self):
"""Worker thread for audio recording with VAD"""
p = pyaudio.PyAudio()
stream = p.open(
format=self.audio_format,
channels=self.channels,
rate=self.sample_rate,
input=True,
frames_per_buffer=self.chunk_size
)
self.recording_status.emit("Ready for voice commands")
# VAD state
speech_frames = []
is_speech = False
silent_chunks = 0
speech_chunks = 0
max_silent_chunks = int(1000 / self.chunk_duration_ms) # 1-second silence
try:
while not self.stop_event.is_set():
audio_chunk = stream.read(self.chunk_size, exception_on_overflow=False)
is_speech_frame = self.vad.is_speech(audio_chunk, self.sample_rate)
# State machine for speech detection
if is_speech_frame:
if not is_speech:
# Transition from silence to speech
is_speech = True
speech_frames = [] # Clear previous frames
self.recording_status.emit("Listening...")
speech_chunks += 1
speech_frames.append(audio_chunk)
else:
if is_speech:
# We are in speech state but got silence frame
silent_chunks += 1
if silent_chunks > max_silent_chunks:
# End of speech
is_speech = False
silent_chunks = 0
speech_chunks = 0
if len(speech_frames) > 15: # Filter out very short sounds
timestamp = time.time()
audio_file = os.path.join(self.temp_dir, f"speech_{timestamp}.wav")
self._save_audio(audio_file, speech_frames)
self.audio_queue.put((audio_file, timestamp))
self.recording_status.emit("Processing speech...")
else:
# We're in silence state, just accumulating silence
pass
# Still collect speech frames for a bit after speech ends
if silent_chunks < max_silent_chunks and is_speech:
speech_frames.append(audio_chunk)
finally:
stream.stop_stream()
stream.close()
p.terminate()
self.terminated.emit()
def _transcription_worker(self):
"""Worker thread for ASR"""
while not self.stop_event.is_set():
try:
audio_file, timestamp = self.audio_queue.get(timeout=1)
try:
with sr.AudioFile(audio_file) as source:
audio_data = self.recognizer.record(source)
# 从Google更改为百度语音识别
# text = self.recognizer.recognize_google(audio_data)
# 将音频保存为临时文件以供百度API使用
temp_wav = os.path.join(tempfile.gettempdir(), "temp_speech.wav")
with wave.open(temp_wav, 'wb') as wf:
wf.setnchannels(1)
wf.setsampwidth(2) # 16bit
wf.setframerate(16000)
wf.writeframes(audio_data.get_wav_data())
# 读取文件并调用百度API
with open(temp_wav, 'rb') as f:
audio_bytes = f.read()
# 调用百度语音API
result = self.baidu_client.asr(audio_bytes, 'wav', 16000, {
'dev_pid': 1537, # Mandarin with punctuation
})
if result['err_no'] == 0:
text = result['result'][0]
else:
raise Exception(f"Baidu ASR error: {result['err_msg']}")
if text:
# Emit the utterance with timestamp
self.utterance_detected.emit({
"type": "utterance",
"timestamp": timestamp,
"text": text
})
self.recording_status.emit(f"Detected: \"{text}\"")
except Exception as e:
print(f"Error recognizing speech: {e}")
# Clean up temporary audio file
try:
os.remove(audio_file)
except (PermissionError, FileNotFoundError):
pass
self.audio_queue.task_done()
except queue.Empty:
# Timeout, just continue
pass
def _save_audio(self, file_path, frames):
"""Save audio frames to WAV file"""
with wave.open(file_path, 'wb') as wf:
wf.setnchannels(self.channels)
wf.setsampwidth(pyaudio.PyAudio().get_sample_size(self.audio_format))
wf.setframerate(self.sample_rate)
wf.writeframes(b''.join(frames))
def start_recording(self):
"""Start audio recording and speech recognition threads"""
if self.recording:
return
self.recording = True
self.stop_event.clear()
# Start recording thread
self.recording_thread = threading.Thread(target=self._recording_worker)
self.recording_thread.daemon = True
self.recording_thread.start()
# Start transcription thread
self.transcription_thread = threading.Thread(target=self._transcription_worker)
self.transcription_thread.daemon = True
self.transcription_thread.start()
def stop_recording(self):
"""Stop all recording and transcription threads"""
if not self.recording:
return
self.recording = False
self.stop_event.set()
# Wait for threads to finish
if self.recording_thread and self.recording_thread.is_alive():
self.recording_thread.join(timeout=2)
if self.transcription_thread and self.transcription_thread.is_alive():
self.transcription_thread.join(timeout=2)
self.terminated.emit()
def __del__(self):
"""Cleanup temporary files on deletion"""
self.stop_recording()
# Clean up temp directory
try:
import shutil
shutil.rmtree(self.temp_dir, ignore_errors=True)
except:
pass
def recognize_audio(self, audio_file_path=None, audio_data=None):
"""
使用百度API进行语音识别
"""
try:
# 如果提供了音频数据,优先使用音频数据
if audio_data is not None:
if self.use_baidu and self.baidu_client:
# 使用百度API进行识别
result = self.baidu_client.asr(audio_data, 'pcm', 16000, {
'dev_pid': 1537, # 普通话(有标点)
})
if result['err_no'] == 0:
return result['result'][0]
else:
print(f"Baidu ASR error: {result['err_msg']}")
# 如果百度API失败回退到原有的识别方法
else:
# 使用原来的识别方法
# ... 原来的代码 ...
pass
# 如果提供了音频文件路径
elif audio_file_path is not None and os.path.exists(audio_file_path):
if self.use_baidu and self.baidu_client:
with open(audio_file_path, 'rb') as f:
audio_data = f.read()
result = self.baidu_client.asr(audio_data, 'wav', 16000, {
'dev_pid': 1537, # 普通话(有标点)
})
if result['err_no'] == 0:
return result['result'][0]
else:
print(f"Baidu ASR error: {result['err_msg']}")
# 如果百度API失败回退到原有的识别方法
else:
# 使用原来的识别方法
# ... 原来的代码 ...
pass
except Exception as e:
print(f"Speech recognition error: {e}")
return None

1
streaming_solution.py Normal file
View File

@ -0,0 +1 @@

BIN
temp_method.py Normal file

Binary file not shown.