diff --git a/owl/webapp_zh.py b/owl/webapp_zh.py index f6ba0b2..29998d0 100644 --- a/owl/webapp_zh.py +++ b/owl/webapp_zh.py @@ -61,6 +61,7 @@ LOG_QUEUE = queue.Queue() STOP_LOG_THREAD = threading.Event() CURRENT_PROCESS = None # 用于跟踪当前运行的进程 STOP_REQUESTED = threading.Event() # 用于标记是否请求停止 +CONVERSATION_UPDATE_QUEUE = queue.Queue() # 用于实时更新对话历史的队列 # 日志读取和更新函数 def log_reader_thread(log_file): @@ -252,7 +253,7 @@ def terminate_process(): if CURRENT_PROCESS is None: logging.info("没有正在运行的进程") - return "没有正在运行的进程", "✅ 已就绪" + return "没有正在运行的进程", " 已就绪" try: STOP_REQUESTED.set() # 设置停止标志 @@ -292,11 +293,11 @@ def terminate_process(): CURRENT_PROCESS = None logging.info("进程已终止") - return "进程已终止", "✅ 已就绪" + return "进程已终止", " 已就绪" except Exception as e: logging.error(f"终止进程时出错: {str(e)}") - return f"终止进程时出错: {str(e)}", f"❌ 错误: {str(e)}" + return f"终止进程时出错: {str(e)}", f" 错误: {str(e)}" def run_owl(question: str, example_module: str) -> Tuple[str, List[List[str]], str, str]: """运行OWL系统并返回结果 @@ -308,7 +309,14 @@ def run_owl(question: str, example_module: str) -> Tuple[str, List[List[str]], s Returns: Tuple[...]: 回答、聊天历史、令牌计数、状态 """ - global CURRENT_PROCESS, STOP_REQUESTED + global CURRENT_PROCESS, STOP_REQUESTED, CONVERSATION_UPDATE_QUEUE + + # 清空对话更新队列 + while not CONVERSATION_UPDATE_QUEUE.empty(): + try: + CONVERSATION_UPDATE_QUEUE.get_nowait() + except queue.Empty: + break # 重置停止标志 STOP_REQUESTED.clear() @@ -374,6 +382,21 @@ def run_owl(question: str, example_module: str) -> Tuple[str, List[List[str]], s try: logging.info("正在构建社会模拟...") society = module.construct_society(question) + + # 添加对话更新回调 + if hasattr(society, 'set_message_callback'): + def message_callback(role, content): + """对话消息回调函数""" + try: + # 将消息添加到队列 + CONVERSATION_UPDATE_QUEUE.put((role, content)) + logging.info(f"对话回调: {role} - {content[:30]}...") + except Exception as e: + logging.error(f"对话回调处理错误: {str(e)}") + + # 设置回调 + society.set_message_callback(message_callback) + logging.info("已设置对话更新回调") except Exception as e: logging.error(f"构建社会模拟时发生错误: {str(e)}") return ( @@ -551,11 +574,95 @@ def create_ui(): # 创建一个实时日志更新函数 def process_with_live_logs(question, module_name): - """处理问题并实时更新日志""" - global CURRENT_PROCESS, STOP_REQUESTED + """处理问题并实时更新日志和对话历史""" + global CURRENT_PROCESS, STOP_REQUESTED, CONVERSATION_UPDATE_QUEUE # 创建一个后台线程来处理问题 result_queue = queue.Queue() + # 创建一个队列用于实时更新对话历史 + chat_history_queue = queue.Queue() + + # 初始化对话历史,添加用户问题 + current_chat_history = [[question, None]] + + # 创建一个函数来监听日志中的对话更新 + def monitor_logs_for_chat_updates(): + """监控日志中的对话更新并将其添加到队列中""" + try: + # 创建一个单独的日志队列用于监控对话 + chat_log_queue = queue.Queue() + + # 打开日志文件进行监控 + with open(LOG_FILE, 'r', encoding='utf-8') as f: + # 移动到文件末尾 + f.seek(0, 2) + + while not STOP_REQUESTED.is_set(): + line = f.readline() + if line: + # 尝试多种模式来检测对话信息 + + # 模式1: 检查标准的Agent对话格式 + if "Agent:" in line and ":" in line.split("Agent:")[1]: + try: + agent_part = line.split("Agent:")[1].strip() + agent_name = agent_part.split(":")[0].strip() + message = ":".join(agent_part.split(":")[1:]).strip() + + # 将对话信息添加到队列 + chat_history_queue.put((agent_name, message)) + logging.info(f"检测到对话更新(模式1): {agent_name} - {message[:30]}...") + except Exception as e: + logging.error(f"解析对话信息时出错(模式1): {str(e)}") + + # 模式2: 检查包含角色名和消息的格式 + elif " - " in line and any(role in line for role in ["用户", "助手", "系统", "User", "Assistant", "System"]): + try: + parts = line.split(" - ", 1) + if len(parts) >= 2: + # 尝试提取角色名 + log_prefix = parts[0] + message_part = parts[1] + + # 尝试从日志前缀中提取角色名 + role_candidates = ["用户", "助手", "系统", "User", "Assistant", "System"] + agent_name = None + for role in role_candidates: + if role in log_prefix: + agent_name = role + break + + if agent_name and message_part.strip(): + chat_history_queue.put((agent_name, message_part.strip())) + logging.info(f"检测到对话更新(模式2): {agent_name} - {message_part[:30]}...") + except Exception as e: + logging.error(f"解析对话信息时出错(模式2): {str(e)}") + + # 模式3: 检查JSON格式的对话记录 + elif '"role"' in line and '"content"' in line and ('"user"' in line.lower() or '"assistant"' in line.lower() or '"system"' in line.lower()): + try: + # 尝试提取JSON部分 + json_start = line.find("{") + json_end = line.rfind("}") + + if json_start >= 0 and json_end > json_start: + json_str = line[json_start:json_end+1] + message_data = json.loads(json_str) + + if "role" in message_data and "content" in message_data: + agent_name = message_data["role"].capitalize() + message = message_data["content"] + + chat_history_queue.put((agent_name, message)) + logging.info(f"检测到对话更新(模式3): {agent_name} - {message[:30]}...") + except Exception as e: + # JSON解析错误是常见的,所以这里不记录为错误 + pass + else: + # 没有新行,等待一小段时间 + time.sleep(0.1) + except Exception as e: + logging.error(f"对话监控线程出错: {str(e)}") def process_in_background(): try: @@ -575,29 +682,81 @@ def create_ui(): except Exception as e: result_queue.put((f"发生错误: {str(e)}", [], "0", f"❌ 错误: {str(e)}")) + # 启动对话监控线程 + chat_monitor_thread = threading.Thread(target=monitor_logs_for_chat_updates, daemon=True) + chat_monitor_thread.start() + # 启动后台处理线程 bg_thread = threading.Thread(target=process_in_background) CURRENT_PROCESS = bg_thread # 记录当前进程 bg_thread.start() - # 在等待处理完成的同时,每秒更新一次日志 + # 在等待处理完成的同时,每秒更新一次日志和对话历史 while bg_thread.is_alive(): # 检查是否已请求停止 if STOP_REQUESTED.is_set(): logs = get_latest_logs(100) - yield None, None, None, "⏹️ 正在终止...", logs + yield None, current_chat_history, None, " 正在终止...", logs break - + + # 检查是否有新的对话更新(从日志解析) + updated = False + while not chat_history_queue.empty(): + try: + agent_name, message = chat_history_queue.get_nowait() + + # 如果是新的对话,添加到历史记录 + if not current_chat_history or current_chat_history[-1][1] is not None: + # 添加新的对话条目 + current_chat_history.append([f"[{agent_name}]", message]) + else: + # 更新最后一个对话的回复 + current_chat_history[-1][1] = message + + updated = True + except queue.Empty: + break + + # 检查是否有新的对话更新(从回调机制) + while not CONVERSATION_UPDATE_QUEUE.empty(): + try: + role, content = CONVERSATION_UPDATE_QUEUE.get_nowait() + + # 格式化角色名称 + if role.lower() == "user": + role_display = "用户" + elif role.lower() == "assistant": + role_display = "助手" + else: + role_display = role + + # 如果是新的对话,添加到历史记录 + if not current_chat_history or current_chat_history[-1][1] is not None: + # 添加新的对话条目 + current_chat_history.append([f"[{role_display}]", content]) + else: + # 更新最后一个对话的回复 + current_chat_history[-1][1] = content + + updated = True + logging.info(f"从回调更新对话: {role_display} - {content[:30]}...") + except queue.Empty: + break + # 更新日志显示 logs = get_latest_logs(100) - yield None, None, None, "⏳ 处理中...", logs + + # 如果有更新或者每秒都要更新,则yield新状态 + if updated or True: # 始终更新,可以根据需要调整 + yield None, current_chat_history, None, " 处理中...", logs + time.sleep(1) # 如果已请求停止但线程仍在运行 if STOP_REQUESTED.is_set() and bg_thread.is_alive(): bg_thread.join(timeout=2) # 等待线程最多2秒 logs = get_latest_logs(100) - yield "操作已取消", [], "0", "⏹️ 已终止", logs + yield "操作已取消", current_chat_history, "0", " 已终止", logs return # 处理完成,获取结果 @@ -605,12 +764,26 @@ def create_ui(): result = result_queue.get() answer, chat_history, token_count, status = result + # 如果有完整的聊天历史,使用它替换我们的临时历史 + if chat_history and len(chat_history) > 0: + # 但首先确保用户问题已包含在内 + if not any(item[0] == question for item in chat_history): + chat_history.insert(0, [question, None]) + current_chat_history = chat_history + # 最后一次更新日志 logs = get_latest_logs(100) - yield answer, chat_history, token_count, status, logs + + # 根据状态设置不同的指示器 + if "错误" in status: + status_with_indicator = f" {status}" + else: + status_with_indicator = f" {status}" + + yield answer, current_chat_history, token_count, status_with_indicator, logs else: logs = get_latest_logs(100) - yield "操作已取消或未完成", [], "0", "⏹️ 已终止", logs + yield "操作已取消或未完成", current_chat_history, "0", " 已终止", logs with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue")) as app: gr.Markdown( @@ -621,6 +794,88 @@ def create_ui(): """ ) + # 添加自定义CSS + gr.HTML(""" + + """) + with gr.Row(): with gr.Column(scale=1): question_input = gr.Textbox( @@ -652,7 +907,10 @@ def create_ui(): run_button = gr.Button("运行", variant="primary", elem_classes="primary") stop_button = gr.Button("停止", variant="stop", elem_classes="stop") - status_output = gr.Textbox(label="状态", interactive=False) + status_output = gr.HTML( + value=" 已就绪", + label="状态" + ) token_count_output = gr.Textbox( label="令牌计数", interactive=False, @@ -669,12 +927,38 @@ def create_ui(): elem_classes="answer-box" ) - with gr.TabItem("对话历史"): + with gr.TabItem("对话历史", id="chat-history-tab"): chat_output = gr.Chatbot( label="完整对话记录", elem_classes="chat-container", - height=500 + height=500, + avatar_images=("👤", "🦉"), # 添加用户和助手的头像 + bubble_full_width=False, # 气泡不占满宽度 + show_copy_button=True # 显示复制按钮 ) + + # 添加自动滚动到底部的JavaScript + gr.HTML(""" + + """) with gr.TabItem("系统日志"): # 添加日志显示区域