fix input question

This commit is contained in:
yifeng.wang
2025-03-10 00:39:08 +08:00
parent 7cd436d559
commit e38ce058b3
3 changed files with 207 additions and 224 deletions

View File

@@ -109,7 +109,8 @@ ENV_GROUPS = {
"required": False,
"help": "Firecrawl API密钥用于网页爬取功能。获取方式https://www.firecrawl.dev/"
},
]
],
"自定义环境变量": [] # 用户自定义的环境变量将存储在这里
}
def get_script_info(script_name):
@@ -127,6 +128,34 @@ def load_env_vars():
for var in group:
env_vars[var["name"]] = os.environ.get(var["name"], "")
# 加载.env文件中可能存在的其他环境变量
if Path(".env").exists():
with open(".env", "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
key, value = line.split("=", 1)
key = key.strip()
value = value.strip().strip('"\'')
# 检查是否是已知的环境变量
known_var = False
for group in ENV_GROUPS.values():
if any(var["name"] == key for var in group):
known_var = True
break
# 如果不是已知的环境变量,添加到自定义环境变量组
if not known_var and key not in env_vars:
ENV_GROUPS["自定义环境变量"].append({
"name": key,
"label": key,
"type": "text",
"required": False,
"help": "用户自定义环境变量"
})
env_vars[key] = value
return env_vars
def save_env_vars(env_vars):
@@ -160,6 +189,32 @@ def save_env_vars(env_vars):
return "✅ 环境变量已保存"
def add_custom_env_var(name, value, var_type):
"""添加自定义环境变量"""
if not name:
return "❌ 环境变量名不能为空", None
# 检查是否已存在同名环境变量
for group in ENV_GROUPS.values():
if any(var["name"] == name for var in group):
return f"❌ 环境变量 {name} 已存在", None
# 添加到自定义环境变量组
ENV_GROUPS["自定义环境变量"].append({
"name": name,
"label": name,
"type": var_type,
"required": False,
"help": "用户自定义环境变量"
})
# 保存环境变量
env_vars = {name: value}
save_env_vars(env_vars)
# 返回成功消息和更新后的环境变量组
return f"✅ 已添加环境变量 {name}", ENV_GROUPS["自定义环境变量"]
def terminate_process():
"""终止当前运行的进程"""
global current_process
@@ -330,29 +385,6 @@ def extract_chat_history(logs):
pass
return None
def modify_script(script_name, question):
"""修改脚本以使用提供的问题"""
script_path = os.path.join("owl", script_name)
with open(script_path, "r", encoding="utf-8") as f:
content = f.read()
# 查找并替换问题变量
if "question = " in content:
# 使用正则表达式替换问题字符串
modified_content = re.sub(
r'question\s*=\s*["\'].*?["\']',
f'question = "{question}"',
content
)
with open(script_path, "w", encoding="utf-8") as f:
f.write(modified_content)
return True
return False
def create_ui():
"""创建Gradio界面"""
# 加载环境变量
@@ -434,32 +466,62 @@ def create_ui():
env_inputs = {}
save_status = gr.Textbox(label="保存状态", interactive=False)
# 添加自定义环境变量部分
with gr.Accordion("添加自定义环境变量", open=True):
with gr.Row():
new_var_name = gr.Textbox(label="环境变量名", placeholder="例如MY_CUSTOM_API_KEY")
new_var_value = gr.Textbox(label="环境变量值", placeholder="输入值")
new_var_type = gr.Dropdown(
choices=["text", "password"],
value="text",
label="类型"
)
add_var_button = gr.Button("添加环境变量", variant="primary")
add_var_status = gr.Textbox(label="添加状态", interactive=False)
# 自定义环境变量列表
custom_vars_list = gr.JSON(
value=ENV_GROUPS["自定义环境变量"],
label="已添加的自定义环境变量",
visible=len(ENV_GROUPS["自定义环境变量"]) > 0
)
# 添加环境变量按钮点击事件
add_var_button.click(
fn=add_custom_env_var,
inputs=[new_var_name, new_var_value, new_var_type],
outputs=[add_var_status, custom_vars_list]
)
# 现有环境变量配置
for group_name, vars in ENV_GROUPS.items():
with gr.Accordion(group_name, open=True):
for var in vars:
# 添加帮助信息
gr.Markdown(f"**{var['help']}**")
if var["type"] == "password":
env_inputs[var["name"]] = gr.Textbox(
value=env_vars.get(var["name"], ""),
label=var["label"] + (" (必填)" if var.get("required", False) else ""),
placeholder=f"请输入{var['label']}",
type="password"
)
else:
env_inputs[var["name"]] = gr.Textbox(
value=env_vars.get(var["name"], ""),
label=var["label"] + (" (必填)" if var.get("required", False) else ""),
placeholder=f"请输入{var['label']}"
)
if group_name != "自定义环境变量" or len(vars) > 0: # 只显示非空的自定义环境变量组
with gr.Accordion(group_name, open=(group_name != "自定义环境变量")):
for var in vars:
# 添加帮助信息
gr.Markdown(f"**{var['help']}**")
if var["type"] == "password":
env_inputs[var["name"]] = gr.Textbox(
value=env_vars.get(var["name"], ""),
label=var["label"] + (" (必填)" if var.get("required", False) else ""),
placeholder=f"请输入{var['label']}",
type="password"
)
else:
env_inputs[var["name"]] = gr.Textbox(
value=env_vars.get(var["name"], ""),
label=var["label"] + (" (必填)" if var.get("required", False) else ""),
placeholder=f"请输入{var['label']}"
)
save_button = gr.Button("保存环境变量", variant="primary")
# 保存环境变量
save_inputs = [env_inputs[var_name] for group in ENV_GROUPS.values() for var in group for var_name in [var["name"]]]
save_inputs = [env_inputs[var_name] for group in ENV_GROUPS.values() for var in group for var_name in [var["name"]] if var_name in env_inputs]
save_button.click(
fn=lambda *values: save_env_vars(dict(zip([var["name"] for group in ENV_GROUPS.values() for var in group], values))),
fn=lambda *values: save_env_vars(dict(zip([var["name"] for group in ENV_GROUPS.values() for var in group if var["name"] in env_inputs], values))),
inputs=save_inputs,
outputs=save_status
)
@@ -494,6 +556,7 @@ def create_ui():
- 在"运行日志"标签页查看完整日志
- 在"聊天历史"标签页查看对话历史(如果有)
- 在"环境变量配置"标签页配置API密钥和其他环境变量
- 您可以添加自定义环境变量,满足特殊需求
### ⚠️ 注意事项

View File

@@ -3,6 +3,7 @@ import sys
import importlib.util
import re
from pathlib import Path
import traceback
def load_module_from_path(module_name, file_path):
"""从文件路径加载Python模块"""
@@ -33,57 +34,81 @@ def run_script_with_env_question(script_name):
# 检查脚本是否有main函数
has_main = re.search(r'def\s+main\s*\(\s*\)\s*:', content) is not None
# 尝试查找并替换question变量
# 匹配多种可能的question定义模式
patterns = [
r'question\s*=\s*["\'].*?["\']', # 简单字符串赋值
r'question\s*=\s*\(\s*["\'].*?["\']\s*\)', # 带括号的字符串赋值
r'question\s*=\s*f["\'].*?["\']', # f-string赋值
r'question\s*=\s*r["\'].*?["\']', # 原始字符串赋值
]
# 转义问题中的特殊字符
escaped_question = question.replace("\\", "\\\\").replace("\"", "\\\"").replace("'", "\\'")
question_replaced = False
for pattern in patterns:
if re.search(pattern, content):
# 转义问题中的特殊字符
escaped_question = question.replace("\\", "\\\\").replace("\"", "\\\"").replace("'", "\\'")
# 替换问题
modified_content = re.sub(
pattern,
f'question = "{escaped_question}"',
content
# 查找脚本中所有的question赋值
question_assignments = re.findall(r'question\s*=\s*(?:["\'].*?["\']|\(.*?\))', content)
print(f"在脚本中找到 {len(question_assignments)} 个question赋值")
# 修改脚本内容替换所有的question赋值
modified_content = content
# 如果脚本中有question赋值替换所有的赋值
if question_assignments:
for assignment in question_assignments:
modified_content = modified_content.replace(
assignment,
f'question = "{escaped_question}"'
)
question_replaced = True
break
if not question_replaced:
# 如果没有找到question变量尝试在main函数前插入
print(f"已替换脚本中的所有question赋值为: {question}")
else:
# 如果没有找到question赋值尝试在main函数前插入
if has_main:
# 在main函数前插入question变量
main_match = re.search(r'def\s+main\s*\(\s*\)\s*:', content)
if main_match:
insert_pos = main_match.start()
# 转义问题中的特殊字符
escaped_question = question.replace("\\", "\\\\").replace("\"", "\\\"").replace("'", "\\'")
modified_content = content[:insert_pos] + f'\n# 用户输入的问题\nquestion = "{escaped_question}"\n\n' + content[insert_pos:]
question_replaced = True
print(f"已在main函数前插入问题: {question}")
else:
# 如果没有main函数在文件开头插入
modified_content = f'# 用户输入的问题\nquestion = "{escaped_question}"\n\n' + content
print(f"已在文件开头插入问题: {question}")
if not question_replaced:
# 如果仍然无法替换,尝试在文件末尾添加代码来使用用户的问题
modified_content = content + f'\n\n# 用户输入的问题\nquestion = "{question}"\n\n'
# 添加monkey patch代码确保construct_society函数使用用户的问题
monkey_patch_code = f'''
# 确保construct_society函数使用用户的问题
original_construct_society = globals().get('construct_society')
if original_construct_society:
def patched_construct_society(*args, **kwargs):
# 忽略传入的参数,始终使用用户的问题
return original_construct_society("{escaped_question}")
# 替换原始函数
globals()['construct_society'] = patched_construct_society
print("已修补construct_society函数确保使用用户问题")
'''
# 在文件末尾添加monkey patch代码
modified_content += monkey_patch_code
# 如果脚本没有调用main函数添加调用代码
if has_main and "__main__" not in content:
modified_content += '''
# 如果脚本中有construct_society函数使用用户问题运行
if "construct_society" in globals():
# 确保调用main函数
if __name__ == "__main__":
main()
'''
print("已添加main函数调用代码")
# 如果脚本没有construct_society调用添加调用代码
if "construct_society" in content and "run_society" in content and "Answer:" not in content:
modified_content += f'''
# 确保执行construct_society和run_society
if "construct_society" in globals() and "run_society" in globals():
try:
society = construct_society(question)
society = construct_society("{escaped_question}")
from utils import run_society
answer, chat_history, token_count = run_society(society)
print(f"Answer: {answer}")
print(f"Answer: {{answer}}")
except Exception as e:
print(f"运行时出错: {e}")
print(f"运行时出错: {{e}}")
import traceback
traceback.print_exc()
'''
print("已添加construct_society和run_society调用代码")
# 执行修改后的脚本
try:
@@ -92,35 +117,59 @@ if "construct_society" in globals():
if str(script_dir) not in sys.path:
sys.path.insert(0, str(script_dir))
if has_main:
# 如果有main函数加载模块并调用main
# 创建临时文件
temp_script_path = script_path.with_name(f"temp_{script_path.name}")
with open(temp_script_path, "w", encoding="utf-8") as f:
f.write(modified_content)
# 创建临时文件
temp_script_path = script_path.with_name(f"temp_{script_path.name}")
with open(temp_script_path, "w", encoding="utf-8") as f:
f.write(modified_content)
print(f"已创建临时脚本文件: {temp_script_path}")
try:
# 直接执行临时脚本
print(f"开始执行脚本...")
try:
# 如果有main函数加载模块并调用main
if has_main:
# 加载临时模块
module_name = f"temp_{script_path.stem}"
module = load_module_from_path(module_name, temp_script_path)
# 确保模块中有question变量并且值是用户输入的问题
setattr(module, "question", question)
# 如果模块中有construct_society函数修补它
if hasattr(module, "construct_society"):
original_func = module.construct_society
def patched_func(*args, **kwargs):
return original_func(question)
module.construct_society = patched_func
print("已在模块级别修补construct_society函数")
# 调用main函数
if hasattr(module, "main"):
print("调用main函数...")
module.main()
else:
print(f"错误: 脚本 {script_path} 中没有main函数")
sys.exit(1)
finally:
# 删除临时文件
if temp_script_path.exists():
temp_script_path.unlink()
else:
# 如果没有main函数直接执行修改后的脚本
exec(modified_content, {"__file__": str(script_path)})
else:
# 如果没有main函数直接执行修改后的脚本
print("直接执行脚本内容...")
exec(modified_content, {"__file__": str(temp_script_path)})
except Exception as e:
print(f"执行脚本时出错: {e}")
traceback.print_exc()
sys.exit(1)
finally:
# 删除临时文件
if temp_script_path.exists():
temp_script_path.unlink()
print(f"已删除临时脚本文件: {temp_script_path}")
except Exception as e:
print(f"执行脚本时出错: {e}")
import traceback
print(f"处理脚本时出错: {e}")
traceback.print_exc()
sys.exit(1)

View File

@@ -1,129 +0,0 @@
from dotenv import load_dotenv
load_dotenv()
from camel.models import ModelFactory
from camel.toolkits import (
AudioAnalysisToolkit,
CodeExecutionToolkit,
DocumentProcessingToolkit,
ExcelToolkit,
ImageAnalysisToolkit,
SearchToolkit,
VideoAnalysisToolkit,
WebToolkit,
)
from camel.types import ModelPlatformType, ModelType
from utils import OwlRolePlaying, run_society
def construct_society(question: str) -> OwlRolePlaying:
r"""Construct a society of agents based on the given question.
Args:
question (str): The task or question to be addressed by the society.
Returns:
OwlRolePlaying: A configured society of agents ready to address the question.
"""
# Create models for different components
models = {
"user": ModelFactory.create(
model_platform=ModelPlatformType.OPENAI,
model_type=ModelType.GPT_4O,
model_config_dict={"temperature": 0},
),
"assistant": ModelFactory.create(
model_platform=ModelPlatformType.OPENAI,
model_type=ModelType.GPT_4O,
model_config_dict={"temperature": 0},
),
"web": ModelFactory.create(
model_platform=ModelPlatformType.OPENAI,
model_type=ModelType.GPT_4O,
model_config_dict={"temperature": 0},
),
"planning": ModelFactory.create(
model_platform=ModelPlatformType.OPENAI,
model_type=ModelType.GPT_4O,
model_config_dict={"temperature": 0},
),
"video": ModelFactory.create(
model_platform=ModelPlatformType.OPENAI,
model_type=ModelType.GPT_4O,
model_config_dict={"temperature": 0},
),
"image": ModelFactory.create(
model_platform=ModelPlatformType.OPENAI,
model_type=ModelType.GPT_4O,
model_config_dict={"temperature": 0},
),
"search": ModelFactory.create(
model_platform=ModelPlatformType.OPENAI,
model_type=ModelType.GPT_4O,
model_config_dict={"temperature": 0},
),
}
# Configure toolkits
tools = [
*WebToolkit(
headless=False, # Set to True for headless mode (e.g., on remote servers)
web_agent_model=models["web"],
planning_agent_model=models["planning"],
).get_tools(),
*DocumentProcessingToolkit().get_tools(),
*VideoAnalysisToolkit(model=models["video"]).get_tools(), # This requires OpenAI Key
*AudioAnalysisToolkit().get_tools(), # This requires OpenAI Key
*CodeExecutionToolkit(sandbox="subprocess", verbose=True).get_tools(),
*ImageAnalysisToolkit(model=models["image"]).get_tools(),
*SearchToolkit(model=models["search"]).get_tools(),
*ExcelToolkit().get_tools(),
]
# Configure agent roles and parameters
user_agent_kwargs = {"model": models["user"]}
assistant_agent_kwargs = {"model": models["assistant"], "tools": tools}
# Configure task parameters
task_kwargs = {
"task_prompt": question,
"with_task_specify": False,
}
# Create and return the society
society = OwlRolePlaying(
**task_kwargs,
user_role_name="user",
user_agent_kwargs=user_agent_kwargs,
assistant_role_name="assistant",
assistant_agent_kwargs=assistant_agent_kwargs,
)
return society
# 用户输入的问题
question = "What is the current weather in New York?"
def main():
r"""Main function to run the OWL system with an example question."""
# Example research question
question = (
"What was the volume in m^3 of the fish bag that was calculated in "
"the University of Leicester paper `Can Hiccup Supply Enough Fish "
"to Maintain a Dragon's Diet?`"
)
# Construct and run the society
society = construct_society(question)
answer, chat_history, token_count = run_society(society)
# Output the result
print(f"Answer: {answer}")
if __name__ == "__main__":
main()