diff --git a/owl/webapp_zh.py b/owl/webapp_zh.py index c7b2374..f6ba0b2 100644 --- a/owl/webapp_zh.py +++ b/owl/webapp_zh.py @@ -12,6 +12,10 @@ from dotenv import load_dotenv, set_key, find_dotenv, unset_key import threading import queue import time +import signal +import sys +import subprocess +import platform os.environ['PYTHONIOENCODING'] = 'utf-8' @@ -55,6 +59,8 @@ def setup_logging(): LOG_FILE = None LOG_QUEUE = queue.Queue() STOP_LOG_THREAD = threading.Event() +CURRENT_PROCESS = None # 用于跟踪当前运行的进程 +STOP_REQUESTED = threading.Event() # 用于标记是否请求停止 # 日志读取和更新函数 def log_reader_thread(log_file): @@ -239,6 +245,59 @@ def validate_input(question: str) -> bool: return False return True +# 终止运行进程的函数 +def terminate_process(): + """终止当前运行的进程,适配不同操作系统平台""" + global CURRENT_PROCESS, STOP_REQUESTED + + if CURRENT_PROCESS is None: + logging.info("没有正在运行的进程") + return "没有正在运行的进程", "✅ 已就绪" + + try: + STOP_REQUESTED.set() # 设置停止标志 + logging.info("正在尝试终止进程...") + + # 获取当前操作系统 + current_os = platform.system() + + if current_os == "Windows": + # Windows平台使用taskkill强制终止进程树 + if hasattr(CURRENT_PROCESS, 'pid'): + subprocess.run(f"taskkill /F /T /PID {CURRENT_PROCESS.pid}", shell=True) + logging.info(f"已发送Windows终止命令到进程 {CURRENT_PROCESS.pid}") + else: + # Unix-like系统 (Linux, macOS) + if hasattr(CURRENT_PROCESS, 'pid'): + # 发送SIGTERM信号 + os.killpg(os.getpgid(CURRENT_PROCESS.pid), signal.SIGTERM) + logging.info(f"已发送SIGTERM信号到进程组 {CURRENT_PROCESS.pid}") + + # 给进程一些时间来清理 + time.sleep(0.5) + + # 如果进程仍在运行,发送SIGKILL + try: + if CURRENT_PROCESS.poll() is None: + os.killpg(os.getpgid(CURRENT_PROCESS.pid), signal.SIGKILL) + logging.info(f"已发送SIGKILL信号到进程组 {CURRENT_PROCESS.pid}") + except (ProcessLookupError, OSError): + pass # 进程可能已经终止 + + # 如果是线程,尝试终止线程 + if isinstance(CURRENT_PROCESS, threading.Thread) and CURRENT_PROCESS.is_alive(): + # 线程无法强制终止,但可以设置标志让线程自行退出 + logging.info("等待线程终止...") + CURRENT_PROCESS.join(timeout=2) + + CURRENT_PROCESS = None + logging.info("进程已终止") + return "进程已终止", "✅ 已就绪" + + except Exception as e: + logging.error(f"终止进程时出错: {str(e)}") + return f"终止进程时出错: {str(e)}", f"❌ 错误: {str(e)}" + def run_owl(question: str, example_module: str) -> Tuple[str, List[List[str]], str, str]: """运行OWL系统并返回结果 @@ -249,6 +308,11 @@ def run_owl(question: str, example_module: str) -> Tuple[str, List[List[str]], s Returns: Tuple[...]: 回答、聊天历史、令牌计数、状态 """ + global CURRENT_PROCESS, STOP_REQUESTED + + # 重置停止标志 + STOP_REQUESTED.clear() + # 验证输入 if not validate_input(question): logging.warning("用户提交了无效的输入") @@ -488,34 +552,65 @@ def create_ui(): # 创建一个实时日志更新函数 def process_with_live_logs(question, module_name): """处理问题并实时更新日志""" + global CURRENT_PROCESS, STOP_REQUESTED + # 创建一个后台线程来处理问题 result_queue = queue.Queue() def process_in_background(): try: + # 检查是否已请求停止 + if STOP_REQUESTED.is_set(): + result_queue.put((f"操作已取消", [], "0", f"❌ 操作已取消")) + return + result = run_owl(question, module_name) + + # 再次检查是否已请求停止 + if STOP_REQUESTED.is_set(): + result_queue.put((f"操作已取消", [], "0", f"❌ 操作已取消")) + return + result_queue.put(result) except Exception as e: result_queue.put((f"发生错误: {str(e)}", [], "0", f"❌ 错误: {str(e)}")) # 启动后台处理线程 bg_thread = threading.Thread(target=process_in_background) + CURRENT_PROCESS = bg_thread # 记录当前进程 bg_thread.start() # 在等待处理完成的同时,每秒更新一次日志 while bg_thread.is_alive(): + # 检查是否已请求停止 + if STOP_REQUESTED.is_set(): + logs = get_latest_logs(100) + yield None, None, None, "⏹️ 正在终止...", logs + break + # 更新日志显示 logs = get_latest_logs(100) yield None, None, None, "⏳ 处理中...", logs time.sleep(1) - # 处理完成,获取结果 - result = result_queue.get() - answer, chat_history, token_count, status = result + # 如果已请求停止但线程仍在运行 + if STOP_REQUESTED.is_set() and bg_thread.is_alive(): + bg_thread.join(timeout=2) # 等待线程最多2秒 + logs = get_latest_logs(100) + yield "操作已取消", [], "0", "⏹️ 已终止", logs + return - # 最后一次更新日志 - logs = get_latest_logs(100) - yield answer, chat_history, token_count, status, logs + # 处理完成,获取结果 + if not result_queue.empty(): + result = result_queue.get() + answer, chat_history, token_count, status = result + + # 最后一次更新日志 + logs = get_latest_logs(100) + yield answer, chat_history, token_count, status, logs + else: + logs = get_latest_logs(100) + yield "操作已取消或未完成", [], "0", "⏹️ 已终止", logs with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue")) as app: gr.Markdown( @@ -553,7 +648,9 @@ def create_ui(): elem_classes="module-info" ) - run_button = gr.Button("运行", variant="primary", elem_classes="primary") + with gr.Row(): + run_button = gr.Button("运行", variant="primary", elem_classes="primary") + stop_button = gr.Button("停止", variant="stop", elem_classes="stop") status_output = gr.Textbox(label="状态", interactive=False) token_count_output = gr.Textbox( @@ -729,6 +826,12 @@ def create_ui(): outputs=[answer_output, chat_output, token_count_output, status_output, log_display] ) + # 添加停止按钮事件处理 + stop_button.click( + fn=terminate_process, + outputs=[answer_output, status_output] + ) + # 模块选择更新描述 module_dropdown.change( fn=update_module_description, @@ -789,7 +892,11 @@ def main(): # 注册应用关闭时的清理函数 def cleanup(): + global STOP_LOG_THREAD, STOP_REQUESTED STOP_LOG_THREAD.set() + STOP_REQUESTED.set() + # 尝试终止当前进程 + terminate_process() logging.info("应用程序关闭,停止日志线程") app.launch(share=False,enable_queue=True,server_name="127.0.0.1",server_port=7860) @@ -802,6 +909,9 @@ def main(): finally: # 确保日志线程停止 STOP_LOG_THREAD.set() + STOP_REQUESTED.set() + # 尝试终止当前进程 + terminate_process() logging.info("应用程序关闭") if __name__ == "__main__":