eigent/backend/app/service/chat_service.py
wol 03f2c49b7e
Some checks failed
Remove old artifacts / remove-old-artifacts (push) Has been cancelled
init
2025-08-04 00:20:29 +08:00

492 lines
20 KiB
Python

import asyncio
import datetime
from pathlib import Path
import platform
from typing import Literal
from fastapi import Request
from inflection import titleize
from pydash import chain
from app.component.debug import dump_class
from app.component.environment import env
from app.service.task import (
ActionImproveData,
ActionInstallMcpData,
ActionNewAgent,
create_task_lock,
delete_task_lock,
)
from camel.toolkits import AgentCommunicationToolkit, ToolkitMessageIntegration
from app.utils.toolkit.human_toolkit import HumanToolkit
from app.utils.toolkit.note_taking_toolkit import NoteTakingToolkit
from app.utils.workforce import Workforce
from loguru import logger
from app.model.chat import Chat, NewAgent, Status, sse_json, TaskContent
from camel.tasks import Task
from app.utils.agent import (
ListenChatAgent,
agent_model,
get_mcp_tools,
get_toolkits,
mcp_agent,
developer_agent,
document_agent,
multi_modal_agent,
search_agent,
social_medium_agent,
task_summary_agent,
question_confirm_agent,
)
from app.service.task import Action, Agents
from app.utils.server.sync_step import sync_step
from camel.types import ModelPlatformType
from camel.models import ModelProcessingError
@sync_step
async def step_solve(options: Chat, request: Request):
# if True:
# import faulthandler
# faulthandler.enable()
# for second in [5, 10, 20, 30, 60, 120, 240]:
# faulthandler.dump_traceback_later(second)
task_lock = create_task_lock(options.task_id)
start_event_loop = True
question_agent = question_confirm_agent(options)
camel_task = None
workforce = None
while True:
if await request.is_disconnected():
if workforce is not None:
workforce.stop()
break
try:
item = await task_lock.get_queue()
logger.info(f"item: {dump_class(item)}")
except Exception as e:
logger.error(f"Error getting item from queue: {e}")
break
try:
if item.action == Action.improve or start_event_loop:
# from viztracer import VizTracer
# tracer = VizTracer()
# tracer.start()
if start_event_loop is True:
question = options.question
start_event_loop = False
else:
assert isinstance(item, ActionImproveData)
question = item.data
if len(question) < 12 and len(options.attaches) == 0:
confirm = await question_confirm(question_agent, question)
else:
confirm = True
if confirm is not True:
yield confirm
else:
yield sse_json("confirmed", "")
(workforce, mcp) = await construct_workforce(options)
for new_agent in options.new_agents:
workforce.add_single_agent_worker(
format_agent_description(new_agent), await new_agent_model(new_agent, options)
)
summary_task_agent = task_summary_agent(options)
task_lock.status = Status.confirmed
question = question + options.summary_prompt
camel_task = Task(content=question, id=options.task_id)
if len(options.attaches) > 0:
camel_task.additional_info = {Path(file_path).name: file_path for file_path in options.attaches}
sub_tasks = workforce.eigent_make_sub_tasks(camel_task)
summary_task_content = await summary_task(summary_task_agent, camel_task)
yield to_sub_tasks(camel_task, summary_task_content)
# tracer.stop()
# tracer.save("trace.json")
if env("debug") == "on":
task_lock.status = Status.processing
task = asyncio.create_task(workforce.eigent_start(sub_tasks))
task_lock.add_background_task(task)
elif item.action == Action.update_task:
assert camel_task is not None
update_tasks = {item.id: item for item in item.data.task}
sub_tasks = update_sub_tasks(sub_tasks, update_tasks)
add_sub_tasks(camel_task, item.data.task)
yield to_sub_tasks(camel_task, summary_task_content)
elif item.action == Action.start:
task_lock.status = Status.processing
task = asyncio.create_task(workforce.eigent_start(sub_tasks))
task_lock.add_background_task(task)
elif item.action == Action.task_state:
yield sse_json("task_state", item.data)
elif item.action == Action.create_agent:
yield sse_json("create_agent", item.data)
elif item.action == Action.activate_agent:
yield sse_json("activate_agent", item.data)
elif item.action == Action.deactivate_agent:
yield sse_json("deactivate_agent", dict(item.data))
elif item.action == Action.assign_task:
yield sse_json("assign_task", item.data)
elif item.action == Action.activate_toolkit:
yield sse_json("activate_toolkit", item.data)
elif item.action == Action.deactivate_toolkit:
yield sse_json("deactivate_toolkit", item.data)
elif item.action == Action.write_file:
yield sse_json(
"write_file",
{"file_path": item.data, "process_task_id": item.process_task_id},
)
elif item.action == Action.ask:
yield sse_json("ask", item.data)
elif item.action == Action.notice:
yield sse_json(
"notice",
{"notice": item.data, "process_task_id": item.process_task_id},
)
elif item.action == Action.search_mcp:
yield sse_json("search_mcp", item.data)
elif item.action == Action.install_mcp:
task = asyncio.create_task(install_mcp(mcp, item))
task_lock.add_background_task(task)
elif item.action == Action.terminal:
yield sse_json(
"terminal",
{"output": item.data, "process_task_id": item.process_task_id},
)
elif item.action == Action.pause:
if workforce is not None:
workforce.pause()
elif item.action == Action.resume:
if workforce is not None:
workforce.resume()
elif item.action == Action.new_agent:
if workforce is not None:
workforce.pause()
workforce.add_single_agent_worker(format_agent_description(item), await new_agent_model(item, options))
workforce.resume()
elif item.action == Action.end:
assert camel_task is not None
task_lock.status = Status.done
yield sse_json("end", str(camel_task.result))
if workforce is not None:
workforce.stop_gracefully()
break
elif item.action == Action.supplement:
assert camel_task is not None
task_lock.status = Status.processing
camel_task.add_subtask(
Task(
content=item.data.question,
id=f"{camel_task.id}.{len(camel_task.subtasks)}",
)
)
task = asyncio.create_task(workforce.eigent_start(camel_task.subtasks))
task_lock.add_background_task(task)
elif item.action == Action.budget_not_enough:
if workforce is not None:
workforce.pause()
yield sse_json(Action.budget_not_enough, {"message": "budget not enouth"})
elif item.action == Action.stop:
if workforce is not None:
if workforce._running:
workforce.stop()
workforce.stop_gracefully()
await delete_task_lock(task_lock.id)
break
else:
logger.warning(f"Unknown action: {item.action}")
except ModelProcessingError as e:
if "Budget has been exceeded" in str(e):
# workforce decompose task don't use ListenAgent, this need return sse
if 'workforce' in locals() and workforce is not None:
workforce.pause()
yield sse_json(Action.budget_not_enough, {"message": "budget not enouth"})
else:
logger.error(f"Error processing action {item.action}: {e}")
yield sse_json("error", {"message": str(e)})
if 'workforce' in locals() and workforce is not None and workforce._running:
workforce.stop()
except Exception as e:
logger.error(f"Error processing action {item.action}: {e}")
raise e
# Continue processing other items instead of breaking
async def install_mcp(
mcp: ListenChatAgent,
install_mcp: ActionInstallMcpData,
):
mcp.add_tools(await get_mcp_tools(install_mcp.data))
def to_sub_tasks(task: Task, summary_task_content: str):
return sse_json(
"to_sub_tasks",
{
"summary_task": summary_task_content,
"sub_tasks": tree_sub_tasks(task.subtasks),
},
)
def tree_sub_tasks(sub_tasks: list[Task], depth: int = 0):
if depth > 5:
return []
return (
chain(sub_tasks)
.map(
lambda x: {
"id": x.id,
"content": x.content,
"state": x.state,
"subtasks": tree_sub_tasks(x.subtasks, depth + 1),
}
)
.value()
)
def update_sub_tasks(sub_tasks: list[Task], update_tasks: dict[str, TaskContent], depth: int = 0):
if depth > 5: # limit the depth of the recursion
return []
i = 0
while i < len(sub_tasks):
item = sub_tasks[i]
if item.id in update_tasks:
item.content = update_tasks[item.id].content
update_sub_tasks(item.subtasks, update_tasks, depth + 1)
i += 1
else:
sub_tasks.pop(i)
return sub_tasks
def add_sub_tasks(camel_task: Task, update_tasks: list[TaskContent]):
for item in update_tasks:
if item.id == "": #
camel_task.add_subtask(
Task(
content=item.content,
id=f"{camel_task.id}.{len(camel_task.subtasks) + 1}",
)
)
async def question_confirm(agent: ListenChatAgent, prompt: str) -> str | Literal[True]:
prompt = f"""
> **Your Role:** You are a highly capable agent. Your primary function is to analyze a user's request and determine the appropriate course of action.
>
> **Your Process:**
>
> 1. **Analyze the User's Query:** Carefully examine the user's request: `{prompt}`.
>
> 2. **Categorize the Query:**
> * **Simple Query:** Is this a simple greeting, a question that can be answered directly, or a conversational interaction (e.g., "hello", "thank you")?
> * **Complex Task:** Is this a request that requires a series of steps, code execution, or interaction with tools to complete?
>
> 3. **Execute Your Decision:**
> * **For a Simple Query:** Provide a direct and helpful response.
> * **For a Complex Task:** Your *only* response should be "yes". This will trigger a specialized workforce to handle the task. Do not include any other text, punctuation, or pleasantries.
"""
resp = agent.step(prompt)
logger.info(f"resp: {agent.chat_history}")
if resp.msgs[0].content.lower() != "yes":
return sse_json("wait_confirm", {"content": resp.msgs[0].content})
else:
return True
async def summary_task(agent: ListenChatAgent, task: Task) -> str:
prompt = f"""The user's task is:
---
{task.to_string()}
---
Your instructions are:
1. Come up with a short and descriptive name for this task.
2. Create a concise summary of the task's main points and objectives.
3. Return the task name and the summary, separated by a vertical bar (|).
Example format: "Task Name|This is the summary of the task."
Do not include any other text or formatting.
"""
res = agent.step(prompt)
logger.info(f"summary_task: {res.msgs[0].content}")
return res.msgs[0].content
async def construct_workforce(options: Chat) -> tuple[Workforce, ListenChatAgent]:
working_directory = options.file_save_path()
[coordinator_agent, task_agent] = [
agent_model(
key,
prompt,
options,
[
*(
ToolkitMessageIntegration(
message_handler=HumanToolkit(options.task_id, key).send_message_to_user
).register_toolkits(NoteTakingToolkit(options.task_id, working_directory=working_directory))
).get_tools()
],
)
for key, prompt in {
Agents.coordinator_agent: f"""
You are a helpful coordinator.
- You are now working in system {platform.system()} with architecture
{platform.machine()} at working directory `{working_directory}`. All your
work related to local operations should be done in that directory.
The current date is {datetime.date.today()}. For any date-related tasks, you MUST use this as the current date.
- If a task assigned to another agent fails, you should re-assign it to the
`Developer_Agent`. The `Developer_Agent` is a powerful agent with terminal
access and can resolve a wide range of issues.
""",
Agents.task_agent: f"""
You are a helpful task planner.
- You are now working in system {platform.system()} with architecture
{platform.machine()} at working directory `{working_directory}`. All your
work related to local operations should be done in that directory.
The current date is {datetime.date.today()}. For any date-related tasks, you MUST use this as the current date.
""",
}.items()
]
new_worker_agent = agent_model(
Agents.new_worker_agent,
f"""
You are a helpful assistant.
- You are now working in system {platform.system()} with architecture
{platform.machine()} at working directory `{working_directory}`. All your
work related to local operations should be done in that directory.
The current date is {datetime.date.today()}. For any date-related tasks, you MUST use this as the current date.
""",
options,
[
*HumanToolkit.get_can_use_tools(options.task_id, Agents.new_worker_agent),
*(
ToolkitMessageIntegration(
message_handler=HumanToolkit(options.task_id, Agents.new_worker_agent).send_message_to_user
).register_toolkits(NoteTakingToolkit(options.task_id, working_directory=working_directory))
).get_tools(),
],
)
# msg_toolkit = AgentCommunicationToolkit(max_message_history=100)
searcher = search_agent(options)
developer = await developer_agent(options)
documenter = await document_agent(options)
multi_modaler = multi_modal_agent(options)
# msg_toolkit.register_agent("Worker", new_worker_agent)
# msg_toolkit.register_agent("Search_Agent", searcher)
# msg_toolkit.register_agent("Developer_Agent", developer)
# msg_toolkit.register_agent("Document_Agent", documenter)
# msg_toolkit.register_agent("Multi_Modal_Agent", multi_modaler)
workforce = Workforce(
options.task_id,
"A workforce",
graceful_shutdown_timeout=3, # 30 seconds for debugging
share_memory=False,
coordinator_agent=coordinator_agent,
task_agent=task_agent,
new_worker_agent=new_worker_agent,
use_structured_output_handler=False if options.model_platform == ModelPlatformType.OPENAI else True,
)
workforce.add_single_agent_worker(
"Developer Agent: A master-level coding assistant with a powerful "
"terminal. It can write and execute code, manage files, automate "
"desktop tasks, and deploy web applications to solve complex "
"technical challenges.",
developer,
)
workforce.add_single_agent_worker(
"Search Agent: Can search the web, extract webpage content, "
"simulate browser actions, and provide relevant information to "
"solve the given task.",
searcher,
)
workforce.add_single_agent_worker(
"Document Agent: A document processing assistant skilled in creating "
"and modifying a wide range of file formats. It can generate "
"text-based files/reports (Markdown, JSON, YAML, HTML), "
"office documents (Word, PDF), presentations (PowerPoint), and "
"data files (Excel, CSV).",
documenter,
)
workforce.add_single_agent_worker(
"Multi-Modal Agent: A specialist in media processing. It can "
"analyze images and audio, transcribe speech, download videos, and "
"generate new images from text prompts.",
multi_modaler,
)
# workforce.add_single_agent_worker(
# "Social Media Agent: A social media management assistant for "
# "handling tasks related to WhatsApp, Twitter, LinkedIn, Reddit, "
# "Notion, Slack, and other social platforms.",
# await social_medium_agent(options),
# )
mcp = await mcp_agent(options)
# workforce.add_single_agent_worker(
# "MCP Agent: A Model Context Protocol agent that provides access "
# "to external tools and services through MCP integrations.",
# mcp,
# )
return workforce, mcp
def format_agent_description(agent_data: NewAgent | ActionNewAgent) -> str:
r"""Format a comprehensive agent description including name, tools, and
description.
"""
description_parts = [f"{agent_data.name}:"]
# Add description if available
if hasattr(agent_data, "description") and agent_data.description:
description_parts.append(agent_data.description.strip())
else:
description_parts.append("A specialized agent")
# Add tools information
tool_names = []
if hasattr(agent_data, "tools") and agent_data.tools:
for tool in agent_data.tools:
tool_names.append(titleize(tool))
if hasattr(agent_data, "mcp_tools") and agent_data.mcp_tools:
for mcp_server in agent_data.mcp_tools.get("mcpServers", {}).keys():
tool_names.append(titleize(mcp_server))
if tool_names:
description_parts.append(f"with access to {', '.join(tool_names)} tools : <{tool_names}>")
return " ".join(description_parts)
async def new_agent_model(data: NewAgent | ActionNewAgent, options: Chat):
working_directory = options.file_save_path()
tool_names = []
tools = [*await get_toolkits(data.tools, data.name, options.task_id)]
for item in data.tools:
tool_names.append(titleize(item))
if data.mcp_tools is not None:
tools = [*tools, *await get_mcp_tools(data.mcp_tools)]
for item in data.mcp_tools["mcpServers"].keys():
tool_names.append(titleize(item))
for item in tools:
logger.debug(f"new agent function tool ====== {item.func.__name__}")
# Enhanced system message with platform information
enhanced_description = f"""{data.description}
- You are now working in system {platform.system()} with architecture
{platform.machine()} at working directory `{working_directory}`. All your
work related to local operations should be done in that directory.
The current date is {datetime.date.today()}. For any date-related tasks, you
MUST use this as the current date.
"""
return agent_model(data.name, enhanced_description, options, tools, tool_names=tool_names)