From 4532305f5a26e0a8cd5521207fcbc6fe48bbb1e0 Mon Sep 17 00:00:00 2001 From: Wendong Date: Fri, 14 Mar 2025 19:54:51 +0800 Subject: [PATCH] update wendong --- owl/webapp_zh.py | 418 ++++++++++++++--------------------------------- 1 file changed, 122 insertions(+), 296 deletions(-) diff --git a/owl/webapp_zh.py b/owl/webapp_zh.py index a03b7ed..2c825cc 100644 --- a/owl/webapp_zh.py +++ b/owl/webapp_zh.py @@ -16,12 +16,13 @@ import signal import sys import subprocess import platform +import re os.environ['PYTHONIOENCODING'] = 'utf-8' # 配置日志系统 def setup_logging(): - """配置日志系统,将日志输出到文件和内存队列""" + """配置日志系统,将日志输出到文件和内存队列以及控制台""" # 创建logs目录(如果不存在) logs_dir = os.path.join(os.path.dirname(__file__), "logs") os.makedirs(logs_dir, exist_ok=True) @@ -43,14 +44,18 @@ def setup_logging(): file_handler = logging.FileHandler(log_file, encoding='utf-8', mode='a') file_handler.setLevel(logging.INFO) + # 创建控制台处理器 + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.INFO) + # 创建格式化器 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(formatter) + console_handler.setFormatter(formatter) # 添加处理器到根日志记录器 root_logger.addHandler(file_handler) - - + root_logger.addHandler(console_handler) logging.info("日志系统已初始化,日志文件: %s", log_file) return log_file @@ -58,10 +63,10 @@ def setup_logging(): # 全局变量 LOG_FILE = None LOG_QUEUE = queue.Queue() +LOG_QUEUE2 = queue.Queue() # 对话记录的队列 STOP_LOG_THREAD = threading.Event() CURRENT_PROCESS = None # 用于跟踪当前运行的进程 STOP_REQUESTED = threading.Event() # 用于标记是否请求停止 -CONVERSATION_UPDATE_QUEUE = queue.Queue() # 用于实时更新对话历史的队列 # 日志读取和更新函数 def log_reader_thread(log_file): @@ -75,26 +80,29 @@ def log_reader_thread(log_file): line = f.readline() if line: LOG_QUEUE.put(line) + LOG_QUEUE2.put(line) # 同时添加到第二个队列 else: # 没有新行,等待一小段时间 time.sleep(0.1) except Exception as e: logging.error(f"日志读取线程出错: {str(e)}") -def get_latest_logs(max_lines=100): +def get_latest_logs(max_lines=100, queue_source=None): """从队列中获取最新的日志行,如果队列为空则直接从文件读取 Args: max_lines: 最大返回行数 + queue_source: 指定使用哪个队列,默认为LOG_QUEUE Returns: str: 日志内容 """ logs = [] + log_queue = queue_source if queue_source else LOG_QUEUE try: # 尝试从队列中获取所有可用的日志行 - while not LOG_QUEUE.empty() and len(logs) < max_lines: - logs.append(LOG_QUEUE.get_nowait()) + while not log_queue.empty() and len(logs) < max_lines: + logs.append(log_queue.get_nowait()) except queue.Empty: pass @@ -118,7 +126,34 @@ def get_latest_logs(max_lines=100): if not logs: return "暂无日志记录或日志系统未正确初始化。" - return "".join(logs) + # 格式化日志输出,确保每个日志条目有适当的换行和分隔 + formatted_logs = [] + for log in logs: + # 移除开头和结尾的多余空白字符 + log = log.strip() + + # 处理包含JSON或代码片段的日志,确保它们有正确的换行和缩进 + if '"]"\n}' in log or '\n}\n\n' in log: + # 替换不合理的换行为更清晰的格式 + log = log.replace('"]"\n}', '"]" }').replace('\n}\n\n', ' }\n') + + # 检测日期时间格式的开头,这通常表示一个新的日志条目 + # 例如:2025-03-14 18:49:31,008 - httpx - INFO + if re.match(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}', log): + # 在新的日志条目前添加一个空行,使日志更易读 + formatted_logs.append('\n') + + # 确保每个日志条目以换行符结束 + if not log.endswith('\n'): + log += '\n' + + formatted_logs.append(log) + + # 移除第一个可能的额外空行 + if formatted_logs and formatted_logs[0] == '\n': + formatted_logs.pop(0) + + return "".join(formatted_logs) # Dictionary containing module descriptions MODULE_DESCRIPTIONS = { @@ -209,28 +244,7 @@ FIRECRAWL_API_KEY="" #FIRECRAWL_API_URL="https://api.firecrawl.dev" """ -def format_chat_history(chat_history: List[Dict[str, str]]) -> List[List[str]]: - """将聊天历史格式化为Gradio聊天组件可接受的格式 - - Args: - chat_history: 原始聊天历史 - - Returns: - List[List[str]]: 格式化后的聊天历史 - """ - formatted_history = [] - for message in chat_history: - user_msg = message.get("user", "") - assistant_msg = message.get("assistant", "") - - if user_msg: - formatted_history.append([user_msg, None]) - if assistant_msg and formatted_history: - formatted_history[-1][1] = assistant_msg - elif assistant_msg: - formatted_history.append([None, assistant_msg]) - - return formatted_history + def validate_input(question: str) -> bool: """验证用户输入是否有效 @@ -246,7 +260,7 @@ def validate_input(question: str) -> bool: return False return True -def run_owl(question: str, example_module: str) -> Tuple[str, List[List[str]], str, str]: +def run_owl(question: str, example_module: str) -> Tuple[str, str, str]: """运行OWL系统并返回结果 Args: @@ -254,23 +268,15 @@ def run_owl(question: str, example_module: str) -> Tuple[str, List[List[str]], s example_module: 要导入的示例模块名(如 "run_terminal_zh" 或 "run_deep") Returns: - Tuple[...]: 回答、聊天历史、令牌计数、状态 + Tuple[...]: 回答、令牌计数、状态 """ - global CURRENT_PROCESS, CONVERSATION_UPDATE_QUEUE - - # 清空对话更新队列 - while not CONVERSATION_UPDATE_QUEUE.empty(): - try: - CONVERSATION_UPDATE_QUEUE.get_nowait() - except queue.Empty: - break + global CURRENT_PROCESS # 验证输入 if not validate_input(question): logging.warning("用户提交了无效的输入") return ( "请输入有效的问题", - [], "0", "❌ 错误: 输入无效" ) @@ -285,7 +291,6 @@ def run_owl(question: str, example_module: str) -> Tuple[str, List[List[str]], s logging.error(f"用户选择了不支持的模块: {example_module}") return ( f"所选模块 '{example_module}' 不受支持", - [], "0", f"❌ 错误: 不支持的模块" ) @@ -299,7 +304,6 @@ def run_owl(question: str, example_module: str) -> Tuple[str, List[List[str]], s logging.error(f"无法导入模块 {module_path}: {str(ie)}") return ( f"无法导入模块: {module_path}", - [], "0", f"❌ 错误: 模块 {example_module} 不存在或无法加载 - {str(ie)}" ) @@ -307,7 +311,6 @@ def run_owl(question: str, example_module: str) -> Tuple[str, List[List[str]], s logging.error(f"导入模块 {module_path} 时发生错误: {str(e)}") return ( f"导入模块时发生错误: {module_path}", - [], "0", f"❌ 错误: {str(e)}" ) @@ -317,7 +320,6 @@ def run_owl(question: str, example_module: str) -> Tuple[str, List[List[str]], s logging.error(f"模块 {module_path} 中未找到 construct_society 函数") return ( f"模块 {module_path} 中未找到 construct_society 函数", - [], "0", f"❌ 错误: 模块接口不兼容" ) @@ -327,25 +329,11 @@ def run_owl(question: str, example_module: str) -> Tuple[str, List[List[str]], s logging.info("正在构建社会模拟...") society = module.construct_society(question) - # 添加对话更新回调 - if hasattr(society, 'set_message_callback'): - def message_callback(role, content): - """对话消息回调函数""" - try: - # 将消息添加到队列 - CONVERSATION_UPDATE_QUEUE.put((role, content)) - logging.info(f"对话回调: {role} - {content[:30]}...") - except Exception as e: - logging.error(f"对话回调处理错误: {str(e)}") - - # 设置回调 - society.set_message_callback(message_callback) - logging.info("已设置对话更新回调") + except Exception as e: logging.error(f"构建社会模拟时发生错误: {str(e)}") return ( f"构建社会模拟时发生错误: {str(e)}", - [], "0", f"❌ 错误: 构建失败 - {str(e)}" ) @@ -359,18 +347,11 @@ def run_owl(question: str, example_module: str) -> Tuple[str, List[List[str]], s logging.error(f"运行社会模拟时发生错误: {str(e)}") return ( f"运行社会模拟时发生错误: {str(e)}", - [], "0", f"❌ 错误: 运行失败 - {str(e)}" ) - # 格式化聊天历史 - try: - formatted_chat_history = format_chat_history(chat_history) - except Exception as e: - # 如果格式化失败,返回空历史记录但继续处理 - logging.error(f"格式化聊天历史时发生错误: {str(e)}") - formatted_chat_history = [] + # 安全地获取令牌计数 if not isinstance(token_info, dict): @@ -384,7 +365,6 @@ def run_owl(question: str, example_module: str) -> Tuple[str, List[List[str]], s return ( answer, - formatted_chat_history, f"完成令牌: {completion_tokens:,} | 提示令牌: {prompt_tokens:,} | 总计: {total_tokens:,}", "✅ 成功完成" ) @@ -393,7 +373,6 @@ def run_owl(question: str, example_module: str) -> Tuple[str, List[List[str]], s logging.error(f"处理问题时发生未捕获的错误: {str(e)}") return ( f"发生错误: {str(e)}", - [], "0", f"❌ 错误: {str(e)}" ) @@ -512,6 +491,10 @@ def create_ui(): """获取最新日志并返回给前端显示""" return get_latest_logs(100) + def update_logs2(): + """获取最新对话记录并返回给前端显示""" + return get_latest_logs(100, LOG_QUEUE2) + def clear_log_file(): """清空日志文件内容""" try: @@ -525,6 +508,12 @@ def create_ui(): LOG_QUEUE.get_nowait() except queue.Empty: break + # 清空第二个日志队列 + while not LOG_QUEUE2.empty(): + try: + LOG_QUEUE2.get_nowait() + except queue.Empty: + break return "日志文件已清空" else: return "日志文件不存在或未设置" @@ -534,181 +523,43 @@ def create_ui(): # 创建一个实时日志更新函数 def process_with_live_logs(question, module_name): - """处理问题并实时更新日志和对话历史""" - global CURRENT_PROCESS, CONVERSATION_UPDATE_QUEUE + """处理问题并实时更新日志""" + global CURRENT_PROCESS # 创建一个后台线程来处理问题 result_queue = queue.Queue() - # 创建一个队列用于实时更新对话历史 - chat_history_queue = queue.Queue() - - # 初始化对话历史,添加用户问题 - current_chat_history = [[question, None]] - - # 创建一个函数来监听日志中的对话更新 - def monitor_logs_for_chat_updates(): - """监控日志中的对话更新并将其添加到队列中""" - try: - # 创建一个单独的日志队列用于监控对话 - chat_log_queue = queue.Queue() - - # 打开日志文件进行监控 - with open(LOG_FILE, 'r', encoding='utf-8') as f: - # 移动到文件末尾 - f.seek(0, 2) - - while True: - line = f.readline() - if line: - # 尝试多种模式来检测对话信息 - - # 模式1: 检查标准的Agent对话格式 - if "Agent:" in line and ":" in line.split("Agent:")[1]: - try: - agent_part = line.split("Agent:")[1].strip() - agent_name = agent_part.split(":")[0].strip() - message = ":".join(agent_part.split(":")[1:]).strip() - - # 将对话信息添加到队列 - chat_history_queue.put((agent_name, message)) - logging.info(f"检测到对话更新(模式1): {agent_name} - {message[:30]}...") - except Exception as e: - logging.error(f"解析对话信息时出错(模式1): {str(e)}") - - # 模式2: 检查包含角色名和消息的格式 - elif " - " in line and any(role in line for role in ["用户", "助手", "系统", "User", "Assistant", "System"]): - try: - parts = line.split(" - ", 1) - if len(parts) >= 2: - # 尝试提取角色名 - log_prefix = parts[0] - message_part = parts[1] - - # 尝试从日志前缀中提取角色名 - role_candidates = ["用户", "助手", "系统", "User", "Assistant", "System"] - agent_name = None - for role in role_candidates: - if role in log_prefix: - agent_name = role - break - - if agent_name and message_part.strip(): - chat_history_queue.put((agent_name, message_part.strip())) - logging.info(f"检测到对话更新(模式2): {agent_name} - {message_part[:30]}...") - except Exception as e: - logging.error(f"解析对话信息时出错(模式2): {str(e)}") - - # 模式3: 检查JSON格式的对话记录 - elif '"role"' in line and '"content"' in line and ('"user"' in line.lower() or '"assistant"' in line.lower() or '"system"' in line.lower()): - try: - # 尝试提取JSON部分 - json_start = line.find("{") - json_end = line.rfind("}") - - if json_start >= 0 and json_end > json_start: - json_str = line[json_start:json_end+1] - message_data = json.loads(json_str) - - if "role" in message_data and "content" in message_data: - agent_name = message_data["role"].capitalize() - message = message_data["content"] - - chat_history_queue.put((agent_name, message)) - logging.info(f"检测到对话更新(模式3): {agent_name} - {message[:30]}...") - except Exception as e: - # JSON解析错误是常见的,所以这里不记录为错误 - pass - else: - # 没有新行,等待一小段时间 - time.sleep(0.1) - except Exception as e: - logging.error(f"对话监控线程出错: {str(e)}") def process_in_background(): try: result = run_owl(question, module_name) result_queue.put(result) except Exception as e: - result_queue.put((f"发生错误: {str(e)}", [], "0", f"❌ 错误: {str(e)}")) - - # 启动对话监控线程 - chat_monitor_thread = threading.Thread(target=monitor_logs_for_chat_updates, daemon=True) - chat_monitor_thread.start() + result_queue.put((f"发生错误: {str(e)}", "0", f"❌ 错误: {str(e)}")) # 启动后台处理线程 bg_thread = threading.Thread(target=process_in_background) CURRENT_PROCESS = bg_thread # 记录当前进程 bg_thread.start() - # 在等待处理完成的同时,每秒更新一次日志和对话历史 + # 在等待处理完成的同时,每秒更新一次日志 while bg_thread.is_alive(): - # 检查是否有新的对话更新(从日志解析) - updated = False - while not chat_history_queue.empty(): - try: - agent_name, message = chat_history_queue.get_nowait() - - # 如果是新的对话,添加到历史记录 - if not current_chat_history or current_chat_history[-1][1] is not None: - # 添加新的对话条目 - current_chat_history.append([f"[{agent_name}]", message]) - else: - # 更新最后一个对话的回复 - current_chat_history[-1][1] = message - - updated = True - except queue.Empty: - break - - # 检查是否有新的对话更新(从回调机制) - while not CONVERSATION_UPDATE_QUEUE.empty(): - try: - role, content = CONVERSATION_UPDATE_QUEUE.get_nowait() - - # 格式化角色名称 - if role.lower() == "user": - role_display = "用户" - elif role.lower() == "assistant": - role_display = "助手" - else: - role_display = role - - # 如果是新的对话,添加到历史记录 - if not current_chat_history or current_chat_history[-1][1] is not None: - # 添加新的对话条目 - current_chat_history.append([f"[{role_display}]", content]) - else: - # 更新最后一个对话的回复 - current_chat_history[-1][1] = content - - updated = True - logging.info(f"从回调更新对话: {role_display} - {content[:30]}...") - except queue.Empty: - break - # 更新日志显示 logs = get_latest_logs(100) + logs2 = get_latest_logs(100, LOG_QUEUE2) - # 如果有更新或者每秒都要更新,则yield新状态 - if updated or True: # 始终更新,可以根据需要调整 - yield None, current_chat_history, None, " 处理中...", logs + # 始终更新状态 + yield None, "0", " 处理中...", logs, logs2 time.sleep(1) # 处理完成,获取结果 if not result_queue.empty(): result = result_queue.get() - answer, chat_history, token_count, status = result - - # 如果有完整的聊天历史,使用它替换我们的临时历史 - if chat_history and len(chat_history) > 0: - # 但首先确保用户问题已包含在内 - if not any(item[0] == question for item in chat_history): - chat_history.insert(0, [question, None]) - current_chat_history = chat_history + answer, token_count, status = result # 最后一次更新日志 logs = get_latest_logs(100) + logs2 = get_latest_logs(100, LOG_QUEUE2) # 根据状态设置不同的指示器 if "错误" in status: @@ -716,10 +567,11 @@ def create_ui(): else: status_with_indicator = f" {status}" - yield answer, current_chat_history, token_count, status_with_indicator, logs + yield answer, token_count, status_with_indicator, logs, logs2 else: logs = get_latest_logs(100) - yield "操作未完成", current_chat_history, "0", " 已终止", logs + logs2 = get_latest_logs(100, LOG_QUEUE2) + yield "操作未完成", "0", " 已终止", logs, logs2 with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue")) as app: gr.Markdown( @@ -741,28 +593,7 @@ def create_ui(): box-shadow: 0 2px 10px rgba(0, 0, 0, 0.1); } - /* 用户消息样式 */ - .chat-container .user-message { - background-color: #e6f7ff; - border-radius: 18px 18px 0 18px; - padding: 10px 15px; - margin: 5px 0; - } - - /* 助手消息样式 */ - .chat-container .assistant-message { - background-color: #f0f0f0; - border-radius: 18px 18px 18px 0; - padding: 10px 15px; - margin: 5px 0; - } - - /* 角色名称样式 */ - .chat-container .role-name { - font-weight: bold; - margin-bottom: 5px; - } - + /* 改进标签页样式 */ .tabs .tab-nav { background-color: #f5f5f5; @@ -815,26 +646,7 @@ def create_ui(): line-height: 1.4; } - /* 聊天头像样式 */ - .chat-container .avatar { - display: flex !important; - align-items: center; - justify-content: center; - width: 40px !important; - height: 40px !important; - border-radius: 50%; - font-size: 20px; - margin-right: 10px; - } - - .chat-container .avatar.user { - background-color: #e6f7ff; - } - - .chat-container .avatar.assistant { - background-color: #f0f0f0; - } - + @keyframes pulse { 0% { opacity: 1; } 50% { opacity: 0.5; } @@ -885,7 +697,29 @@ def create_ui(): - with gr.Tabs(): + with gr.Tabs(selected=1): # 设置对话记录为默认选中的标签页 + with gr.TabItem("对话记录"): + # 添加对话记录显示区域 + log_display2 = gr.Textbox( + label="对话记录", + lines=25, + max_lines=100, + interactive=False, + autoscroll=True, + show_copy_button=True, + elem_classes="log-display", + container=True + ) + + with gr.Row(): + refresh_logs_button2 = gr.Button("刷新记录") + auto_refresh_checkbox2 = gr.Checkbox( + label="自动刷新", + value=True, + interactive=True + ) + clear_logs_button2 = gr.Button("清空记录", variant="secondary") + with gr.TabItem("系统日志"): # 添加日志显示区域 log_display = gr.Textbox( @@ -914,38 +748,7 @@ def create_ui(): elem_classes="answer-box" ) - with gr.TabItem("对话历史", id="chat-history-tab"): - chat_output = gr.Chatbot( - label="完整对话记录", - elem_classes="chat-container", - height=500, - - bubble_full_width=False, # 气泡不占满宽度 - show_copy_button=True # 显示复制按钮 - ) - - # 添加自动滚动到底部的JavaScript - gr.HTML(""" - - """) + @@ -1047,7 +850,7 @@ def create_ui(): # 示例问题 examples = [ "打开百度搜索,总结一下camel-ai的camel框架的github star、fork数目等,并把数字用plot包写成python文件保存到本地,用本地终端执行python文件显示图出来给我", - "请分析GitHub上CAMEL-AI项目的最新统计数据。找出该项目的星标数量、贡献者数量和最近的活跃度。", + "请分析GitHub上CAMEL-AI项目的最新统计数据。找出该项目的星标数量、贡献者名称,把内容整理成一个markdown文件保存到本地", "浏览亚马逊并找出一款对程序员有吸引力的产品。请提供产品名称和价格", "写一个hello world的python文件,保存到本地", @@ -1075,7 +878,7 @@ def create_ui(): run_button.click( fn=process_with_live_logs, inputs=[question_input, module_dropdown], - outputs=[answer_output, chat_output, token_count_output, status_output, log_display] + outputs=[answer_output, token_count_output, status_output, log_display, log_display2] ) # 模块选择更新描述 @@ -1091,11 +894,21 @@ def create_ui(): outputs=[log_display] ) + refresh_logs_button2.click( + fn=update_logs2, + outputs=[log_display2] + ) + clear_logs_button.click( fn=clear_log_file, outputs=[log_display] ) + clear_logs_button2.click( + fn=clear_log_file, + outputs=[log_display2] + ) + # 自动刷新控制 def toggle_auto_refresh(enabled): if enabled: @@ -1109,6 +922,12 @@ def create_ui(): outputs=[log_display] ) + auto_refresh_checkbox2.change( + fn=toggle_auto_refresh, + inputs=[auto_refresh_checkbox2], + outputs=[log_display2] + ) + # 设置自动刷新(默认每3秒刷新一次) if auto_refresh_checkbox.value: app.load( @@ -1116,6 +935,13 @@ def create_ui(): outputs=[log_display], every=2 ) + + if auto_refresh_checkbox2.value: + app.load( + fn=update_logs2, + outputs=[log_display2], + every=2 + ) return app