This commit is contained in:
zjrwtx
2025-03-14 11:23:49 +08:00
parent b4f9c30762
commit 1bd2246343

View File

@@ -12,6 +12,10 @@ from dotenv import load_dotenv, set_key, find_dotenv, unset_key
import threading
import queue
import time
import signal
import sys
import subprocess
import platform
os.environ['PYTHONIOENCODING'] = 'utf-8'
@@ -55,6 +59,8 @@ def setup_logging():
LOG_FILE = None
LOG_QUEUE = queue.Queue()
STOP_LOG_THREAD = threading.Event()
CURRENT_PROCESS = None # 用于跟踪当前运行的进程
STOP_REQUESTED = threading.Event() # 用于标记是否请求停止
# 日志读取和更新函数
def log_reader_thread(log_file):
@@ -239,6 +245,59 @@ def validate_input(question: str) -> bool:
return False
return True
# 终止运行进程的函数
def terminate_process():
"""终止当前运行的进程,适配不同操作系统平台"""
global CURRENT_PROCESS, STOP_REQUESTED
if CURRENT_PROCESS is None:
logging.info("没有正在运行的进程")
return "没有正在运行的进程", "✅ 已就绪"
try:
STOP_REQUESTED.set() # 设置停止标志
logging.info("正在尝试终止进程...")
# 获取当前操作系统
current_os = platform.system()
if current_os == "Windows":
# Windows平台使用taskkill强制终止进程树
if hasattr(CURRENT_PROCESS, 'pid'):
subprocess.run(f"taskkill /F /T /PID {CURRENT_PROCESS.pid}", shell=True)
logging.info(f"已发送Windows终止命令到进程 {CURRENT_PROCESS.pid}")
else:
# Unix-like系统 (Linux, macOS)
if hasattr(CURRENT_PROCESS, 'pid'):
# 发送SIGTERM信号
os.killpg(os.getpgid(CURRENT_PROCESS.pid), signal.SIGTERM)
logging.info(f"已发送SIGTERM信号到进程组 {CURRENT_PROCESS.pid}")
# 给进程一些时间来清理
time.sleep(0.5)
# 如果进程仍在运行发送SIGKILL
try:
if CURRENT_PROCESS.poll() is None:
os.killpg(os.getpgid(CURRENT_PROCESS.pid), signal.SIGKILL)
logging.info(f"已发送SIGKILL信号到进程组 {CURRENT_PROCESS.pid}")
except (ProcessLookupError, OSError):
pass # 进程可能已经终止
# 如果是线程,尝试终止线程
if isinstance(CURRENT_PROCESS, threading.Thread) and CURRENT_PROCESS.is_alive():
# 线程无法强制终止,但可以设置标志让线程自行退出
logging.info("等待线程终止...")
CURRENT_PROCESS.join(timeout=2)
CURRENT_PROCESS = None
logging.info("进程已终止")
return "进程已终止", "✅ 已就绪"
except Exception as e:
logging.error(f"终止进程时出错: {str(e)}")
return f"终止进程时出错: {str(e)}", f"❌ 错误: {str(e)}"
def run_owl(question: str, example_module: str) -> Tuple[str, List[List[str]], str, str]:
"""运行OWL系统并返回结果
@@ -249,6 +308,11 @@ def run_owl(question: str, example_module: str) -> Tuple[str, List[List[str]], s
Returns:
Tuple[...]: 回答、聊天历史、令牌计数、状态
"""
global CURRENT_PROCESS, STOP_REQUESTED
# 重置停止标志
STOP_REQUESTED.clear()
# 验证输入
if not validate_input(question):
logging.warning("用户提交了无效的输入")
@@ -488,34 +552,65 @@ def create_ui():
# 创建一个实时日志更新函数
def process_with_live_logs(question, module_name):
"""处理问题并实时更新日志"""
global CURRENT_PROCESS, STOP_REQUESTED
# 创建一个后台线程来处理问题
result_queue = queue.Queue()
def process_in_background():
try:
# 检查是否已请求停止
if STOP_REQUESTED.is_set():
result_queue.put((f"操作已取消", [], "0", f"❌ 操作已取消"))
return
result = run_owl(question, module_name)
# 再次检查是否已请求停止
if STOP_REQUESTED.is_set():
result_queue.put((f"操作已取消", [], "0", f"❌ 操作已取消"))
return
result_queue.put(result)
except Exception as e:
result_queue.put((f"发生错误: {str(e)}", [], "0", f"❌ 错误: {str(e)}"))
# 启动后台处理线程
bg_thread = threading.Thread(target=process_in_background)
CURRENT_PROCESS = bg_thread # 记录当前进程
bg_thread.start()
# 在等待处理完成的同时,每秒更新一次日志
while bg_thread.is_alive():
# 检查是否已请求停止
if STOP_REQUESTED.is_set():
logs = get_latest_logs(100)
yield None, None, None, "⏹️ 正在终止...", logs
break
# 更新日志显示
logs = get_latest_logs(100)
yield None, None, None, "⏳ 处理中...", logs
time.sleep(1)
# 处理完成,获取结果
result = result_queue.get()
answer, chat_history, token_count, status = result
# 如果已请求停止但线程仍在运行
if STOP_REQUESTED.is_set() and bg_thread.is_alive():
bg_thread.join(timeout=2) # 等待线程最多2秒
logs = get_latest_logs(100)
yield "操作已取消", [], "0", "⏹️ 已终止", logs
return
# 最后一次更新日志
logs = get_latest_logs(100)
yield answer, chat_history, token_count, status, logs
# 处理完成,获取结果
if not result_queue.empty():
result = result_queue.get()
answer, chat_history, token_count, status = result
# 最后一次更新日志
logs = get_latest_logs(100)
yield answer, chat_history, token_count, status, logs
else:
logs = get_latest_logs(100)
yield "操作已取消或未完成", [], "0", "⏹️ 已终止", logs
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue")) as app:
gr.Markdown(
@@ -553,7 +648,9 @@ def create_ui():
elem_classes="module-info"
)
run_button = gr.Button("运行", variant="primary", elem_classes="primary")
with gr.Row():
run_button = gr.Button("运行", variant="primary", elem_classes="primary")
stop_button = gr.Button("停止", variant="stop", elem_classes="stop")
status_output = gr.Textbox(label="状态", interactive=False)
token_count_output = gr.Textbox(
@@ -729,6 +826,12 @@ def create_ui():
outputs=[answer_output, chat_output, token_count_output, status_output, log_display]
)
# 添加停止按钮事件处理
stop_button.click(
fn=terminate_process,
outputs=[answer_output, status_output]
)
# 模块选择更新描述
module_dropdown.change(
fn=update_module_description,
@@ -789,7 +892,11 @@ def main():
# 注册应用关闭时的清理函数
def cleanup():
global STOP_LOG_THREAD, STOP_REQUESTED
STOP_LOG_THREAD.set()
STOP_REQUESTED.set()
# 尝试终止当前进程
terminate_process()
logging.info("应用程序关闭,停止日志线程")
app.launch(share=False,enable_queue=True,server_name="127.0.0.1",server_port=7860)
@@ -802,6 +909,9 @@ def main():
finally:
# 确保日志线程停止
STOP_LOG_THREAD.set()
STOP_REQUESTED.set()
# 尝试终止当前进程
terminate_process()
logging.info("应用程序关闭")
if __name__ == "__main__":