eigent/backend/app/utils/workforce.py
wol 03f2c49b7e
Some checks failed
Remove old artifacts / remove-old-artifacts (push) Has been cancelled
init
2025-08-04 00:20:29 +08:00

272 lines
10 KiB
Python

import asyncio
from typing import Generator, List
from camel.agents import ChatAgent
from camel.societies.workforce.workforce import (
Workforce as BaseWorkforce,
WorkforceState,
DEFAULT_WORKER_POOL_SIZE,
)
from camel.societies.workforce.task_channel import TaskChannel
from camel.societies.workforce.base import BaseNode
from camel.societies.workforce.utils import TaskAssignResult
from loguru import logger
from camel.tasks.task import Task, TaskState, validate_task_content
from app.component import code
from app.exception.exception import UserException
from app.utils.agent import ListenChatAgent
from app.service.task import (
Action,
ActionAssignTaskData,
ActionEndData,
ActionTaskStateData,
get_camel_task,
get_task_lock,
)
from app.utils.single_agent_worker import SingleAgentWorker
# === Debug sink === Write detailed dependency debug logs to file (logs/workforce_debug.log)
# Create a new file every day, keep the logs for the last 7 days, and write asynchronously without blocking the main process
logger.add(
"logs/workforce_debug_{time:YYYY-MM-DD}.log",
rotation="00:00",
retention="7 days",
enqueue=True,
level="DEBUG",
)
# Independent sink: only collect the "[WF]" debug lines we insert to quickly view the dependency chain
logger.add(
"logs/wf_trace_{time:YYYY-MM-DD-HH}.log",
rotation="00:00",
retention="7 days",
enqueue=True,
level="DEBUG",
filter=lambda record: record["message"].startswith("[WF]"),
)
class Workforce(BaseWorkforce):
def __init__(
self,
api_task_id: str,
description: str,
children: List[BaseNode] | None = None,
coordinator_agent: ChatAgent | None = None,
task_agent: ChatAgent | None = None,
new_worker_agent: ChatAgent | None = None,
graceful_shutdown_timeout: float = 3,
share_memory: bool = False,
use_structured_output_handler: bool = True,
) -> None:
self.api_task_id = api_task_id
super().__init__(
description=description,
children=children,
coordinator_agent=coordinator_agent,
task_agent=task_agent,
new_worker_agent=new_worker_agent,
graceful_shutdown_timeout=graceful_shutdown_timeout,
share_memory=share_memory,
use_structured_output_handler=use_structured_output_handler,
)
def eigent_make_sub_tasks(self, task: Task):
"""split process_task method to eigent_make_sub_tasks and eigent_start method"""
if not validate_task_content(task.content, task.id):
task.state = TaskState.FAILED
task.result = "Task failed: Invalid or empty content provided"
logger.warning(
f"Task {task.id} rejected: Invalid or empty content. Content preview: '{task.content[:50]}...'"
)
raise UserException(code.error, task.result)
self.reset()
self._task = task
self._state = WorkforceState.RUNNING
task.state = TaskState.OPEN
self._pending_tasks.append(task)
# Decompose the task into subtasks first
subtasks_result = self._decompose_task(task)
# Handle both streaming and non-streaming results
if isinstance(subtasks_result, Generator):
# This is a generator (streaming mode)
subtasks = []
for new_tasks in subtasks_result:
subtasks.extend(new_tasks)
else:
# This is a regular list (non-streaming mode)
subtasks = subtasks_result
return subtasks
async def eigent_start(self, subtasks: list[Task]):
"""start the workforce"""
logger.debug(f"start the workforce {subtasks=}")
self._pending_tasks.extendleft(reversed(subtasks))
self.set_channel(TaskChannel())
# Save initial snapshot
self.save_snapshot("Initial task decomposition")
try:
await self.start()
except Exception as e:
logger.error(f"Error in workforce execution: {e}")
self._state = WorkforceState.STOPPED
raise
finally:
if self._state != WorkforceState.STOPPED:
self._state = WorkforceState.IDLE
async def _find_assignee(self, tasks: List[Task]) -> TaskAssignResult:
# Task assignment phase: send "waiting for execution" notification to the frontend, and send "start execution" notification when the task actually begins execution
assigned = await super()._find_assignee(tasks)
task_lock = get_task_lock(self.api_task_id)
for item in assigned.assignments:
# DEBUG ▶ Task has been assigned to which worker and its dependencies
logger.debug(f"[WF] ASSIGN {item.task_id} -> {item.assignee_id} deps={item.dependencies}")
# The main task itself does not need notification
if self._task and item.task_id == self._task.id:
continue
# Find task content
task_obj = get_camel_task(item.task_id, tasks)
content = task_obj.content if task_obj else ""
# Asynchronously send waiting notification
task = asyncio.create_task(
task_lock.put_queue(
ActionAssignTaskData(
action=Action.assign_task,
data={
"assignee_id": item.assignee_id,
"task_id": item.task_id,
"content": content,
"state": "waiting", # Mark as waiting state
},
)
)
)
# Track the task for cleanup
task_lock.add_background_task(task)
return assigned
async def _post_task(self, task: Task, assignee_id: str) -> None:
# DEBUG ▶ Dependencies are met, the task really starts to execute
logger.debug(f"[WF] POST {task.id} -> {assignee_id}")
"""Override the _post_task method to notify the frontend when the task really starts to execute"""
# When the dependency check is passed and the task is about to be published to the execution queue, send a notification to the frontend
task_lock = get_task_lock(self.api_task_id)
if self._task and task.id != self._task.id: # Skip the main task itself
await task_lock.put_queue(
ActionAssignTaskData(
action=Action.assign_task,
data={
"assignee_id": assignee_id,
"task_id": task.id,
"content": task.content,
"state": "running", # running state
},
)
)
# Call the parent class method to continue the normal task publishing process
await super()._post_task(task, assignee_id)
def add_single_agent_worker(
self, description: str, worker: ListenChatAgent, pool_max_size: int = DEFAULT_WORKER_POOL_SIZE
) -> BaseWorkforce:
if self._state == WorkforceState.RUNNING:
raise RuntimeError("Cannot add workers while workforce is running. Pause the workforce first.")
# Validate worker agent compatibility
self._validate_agent_compatibility(worker, "Worker agent")
# Ensure the worker agent shares this workforce's pause control
self._attach_pause_event_to_agent(worker)
worker_node = SingleAgentWorker(
description=description,
worker=worker,
pool_max_size=pool_max_size,
use_structured_output_handler=self.use_structured_output_handler,
)
self._children.append(worker_node)
# If we have a channel set up, set it for the new worker
if hasattr(self, "_channel") and self._channel is not None:
worker_node.set_channel(self._channel)
# If workforce is paused, start the worker's listening task
self._start_child_node_when_paused(worker_node.start())
if self.metrics_logger:
self.metrics_logger.log_worker_created(
worker_id=worker_node.node_id,
worker_type="SingleAgentWorker",
role=worker_node.description,
)
return self
async def _handle_completed_task(self, task: Task) -> None:
# DEBUG ▶ Task completed
logger.debug(f"[WF] DONE {task.id}")
task_lock = get_task_lock(self.api_task_id)
await task_lock.put_queue(
ActionTaskStateData(
data={
"task_id": task.id,
"content": task.content,
"state": task.state,
"result": task.result or "",
"failure_count": task.failure_count,
},
)
)
return await super()._handle_completed_task(task)
async def _handle_failed_task(self, task: Task) -> bool:
# DEBUG ▶ Task failed
logger.debug(f"[WF] FAIL {task.id} retry={task.failure_count}")
result = await super()._handle_failed_task(task)
error_message = ""
if self.metrics_logger and hasattr(self.metrics_logger, "log_entries"):
for entry in reversed(self.metrics_logger.log_entries):
if entry.get("event_type") == "task_failed" and entry.get("task_id") == task.id:
error_message = entry.get("error_message")
break
task_lock = get_task_lock(self.api_task_id)
await task_lock.put_queue(
ActionTaskStateData(
data={
"task_id": task.id,
"content": task.content,
"state": task.state,
"failure_count": task.failure_count,
"result": str(error_message),
}
)
)
return result
def stop(self) -> None:
super().stop()
task_lock = get_task_lock(self.api_task_id)
task = asyncio.create_task(task_lock.put_queue(ActionEndData()))
task_lock.add_background_task(task)
async def cleanup(self) -> None:
r"""Clean up resources when workforce is done"""
try:
# Clean up the task lock
from app.service.task import delete_task_lock
await delete_task_lock(self.api_task_id)
except Exception as e:
logger.error(f"Error cleaning up workforce resources: {e}")