OpenHands/opendevin/controller/agent_controller.py
Xingyao Wang 1c7cdbefdd
feat(CodeActAgent): Support Agent-User Interaction during Task Execution and the Full Integration of CodeActAgent (#1290)
* initialize plugin definition

* initialize plugin definition

* simplify mixin

* further improve plugin mixin

* add cache dir for pip

* support clean up cache

* add script for setup jupyter and execution server

* integrate JupyterRequirement to ssh_box

* source bashrc at the end of plugin load

* add execute_cli that accept code via stdin

* make JUPYTER_EXEC_SERVER_PORT configurable via env var

* increase background cmd sleep time

* Update opendevin/sandbox/plugins/mixin.py

Co-authored-by: Robert Brennan <accounts@rbren.io>

* add mixin to base class

* make jupyter requirement a dataclass

* source plugins only when >0 requirements

* add `sandbox_plugins` for each agent & have controller take care of it

* update build.sh to make logs available in /opendevin/logs

* switch to use config for lib and cache dir

* Add SANDBOX_WORKSPACE_DIR into config

* Add SANDBOX_WORKSPACE_DIR into config

* fix occurence of /workspace

* fix permission issue with /workspace

* use python to implement execute_cli to avoid stdin escape issue

* add IPythonRunCellAction and get it working

* wait until jupyter is avaialble

* support plugin via copying instead of mounting

* add agent talk action

* support follow-up user language feedback

* add __str__ for action to be printed better

* only print PLAN at the beginning

* wip: update codeact agent

* get rid the initial messate

* update codeact agent to handle null action;
add thought to bash

* dispatch thought for RUN action as well

* fix weird behavior of pxssh where the output would not flush correctly

* make ssh box can handle exit_code properly as well

* add initial version of swe-agent plugin;

* rename swe cursors

* split setup script into two and create two requirements

* print SWE-agent command documentation

* update swe-agent to default to no custom docs

* add initial version of swe-agent plugin;

* rename swe cursors

* split setup script into two and create two requirements

* print SWE-agent command documentation

* update swe-agent to default to no custom docs

* update dockerfile with dependency from swe-agent

* make env setup a separate script for .bashrc source

* add wip prompt

* fix mount_dir for ssh_box

* update prompt

* fix mount_dir for ssh_box

* default to use host network

* default to use host network

* move prompt to a separate file

* fix swe-tool plugins;
add missing _split_string

* remove hostname from sshbox

* update the prompt with edit functionality

* fix swe-tool plugins;
add missing _split_string

* add awaiting into status bar

* fix the bug of additional send event

* remove some print action

* move logic to config.py

* remove debugging comments

* make host network as default

* make WORKSPACE_MOUNT_PATH as abspath

* implement execute_cli via file cp

* Revert "implement execute_cli via file cp"

This reverts commit 06f0155bc17d1f99097e71b83b2143f6e8092654.

* add codeact dependencies to default container

* add IPythonRunCellObservation

* add back cache dir and default to /tmp

* make USE_HOST_NETWORK a bool

* revert use host network to false

* add temporarily fix for IPython RUN action

* update prompt

* revert USE_HOST_NETWORK to true since it is not affecting anything

* attempt to fix lint

* remove newline

* fix jupyter execution server

* add `thought` to most action class

* fix unit tests for current action abstraction

* support user exit

* update test cases with the latest action format (added 'thought')

* fix integration test for CodeActAGent by mocking stdin

* only mock stdin for tests with user_responses.log

* remove -exec integration test for CodeActAgent since it is not supported

* remove specific stop word

* fix comments

* improve clarity of prompt

* fix py lint

* fix integration tests

* sandbox might failed in chown due to mounting, but it won't be fatal

* update debug instruction for sshbox

* fix typo

* get RUN_AS_DEVIN and network=host working with app sandbox

* get RUN_AS_DEVIN and network=host working with app sandbox

* attempt to fix the workspace base permission

* sandbox might failed in chown due to mounting, but it won't be fatal

* update sshbox instruction

* remove default user id since it will be passed in the instruction

* revert permission fix since it should be resolved by correct SANDBOX_USER_ID

* the permission issue can be fixed by simply provide correct env var

* remove log

* set sandbox user id to getuid by default

* move logging to initializer

* make the uid consistent across host, app container, and sandbox

* remove hostname as it causes sudo issue

* fix permission of entrypoint script

* make the uvicron app run as host user uid for jupyter plugin

* add warning message

* update dev md for instruction of running unit tests

* add back unit tests

* revert back to the original sandbox implementation to fix testcases

* revert use host network

* get docker socket gid and usermod instead of chmod 777

* allow unit test workflow to find docker.sock

* make sandbox test working via patch

* fix arg parser that's broken for some reason

* try to fix app build disk space issue

* fix integration test

* Revert "fix arg parser that's broken for some reason"

This reverts commit 6cc89611337bb74555fd16b4be78681fb7e36573.

* update Development.md

* cleanup intergration tests & add exception for CodeAct+execbox

* fix config

* implement user_message action

* fix doc

* fix event dict error

* fix frontend lint

* revert accidentally changes to integration tests

* revert accidentally changes to integration tests

---------

Co-authored-by: Robert Brennan <accounts@rbren.io>
Co-authored-by: Robert Brennan <contact@rbren.io>
2024-05-01 08:40:00 -04:00

332 lines
12 KiB
Python

import asyncio
from typing import Callable, List, Type
from agenthub.codeact_agent.codeact_agent import CodeActAgent
from opendevin import config
from opendevin.action import (
Action,
AgentDelegateAction,
AgentFinishAction,
AgentTalkAction,
NullAction,
)
from opendevin.action.tasks import TaskStateChangedAction
from opendevin.agent import Agent
from opendevin.controller.action_manager import ActionManager
from opendevin.exceptions import (
AgentMalformedActionError,
AgentNoActionError,
LLMOutputError,
MaxCharsExceedError,
)
from opendevin.logger import opendevin_logger as logger
from opendevin.observation import (
AgentDelegateObservation,
AgentErrorObservation,
NullObservation,
Observation,
UserMessageObservation,
)
from opendevin.plan import Plan
from opendevin.sandbox import DockerSSHBox
from opendevin.schema import TaskState
from opendevin.schema.config import ConfigType
from opendevin.state import State
MAX_ITERATIONS = config.get(ConfigType.MAX_ITERATIONS)
MAX_CHARS = config.get(ConfigType.MAX_CHARS)
class AgentController:
id: str
agent: Agent
max_iterations: int
action_manager: ActionManager
callbacks: List[Callable]
delegate: 'AgentController | None' = None
state: State | None = None
_task_state: TaskState = TaskState.INIT
_cur_step: int = 0
def __init__(
self,
agent: Agent,
inputs: dict = {},
sid: str = 'default',
max_iterations: int = MAX_ITERATIONS,
max_chars: int = MAX_CHARS,
callbacks: List[Callable] = [],
):
self.id = sid
self.agent = agent
self.max_iterations = max_iterations
self.action_manager = ActionManager(self.id)
self.max_chars = max_chars
self.callbacks = callbacks
# Initialize agent-required plugins for sandbox (if any)
self.action_manager.init_sandbox_plugins(agent.sandbox_plugins)
if isinstance(agent, CodeActAgent) and not isinstance(self.action_manager.sandbox, DockerSSHBox):
logger.warning('CodeActAgent requires DockerSSHBox as sandbox! Using other sandbox that are not stateful (LocalBox, DockerExecBox) will not work properly.')
self._await_user_message_queue: asyncio.Queue = asyncio.Queue()
def update_state_for_step(self, i):
if self.state is None:
return
self.state.iteration = i
self.state.background_commands_obs = self.action_manager.get_background_obs()
def update_state_after_step(self):
if self.state is None:
return
self.state.updated_info = []
def add_history(self, action: Action, observation: Observation):
if self.state is None:
return
if not isinstance(action, Action):
raise TypeError(
f'action must be an instance of Action, got {type(action).__name__} instead'
)
if not isinstance(observation, Observation):
raise TypeError(
f'observation must be an instance of Observation, got {type(observation).__name__} instead'
)
self.state.history.append((action, observation))
self.state.updated_info.append((action, observation))
async def _run(self):
if self.state is None:
return
if self._task_state != TaskState.RUNNING:
raise ValueError('Task is not in running state')
for i in range(self._cur_step, self.max_iterations):
self._cur_step = i
try:
finished = await self.step(i)
if finished:
self._task_state = TaskState.FINISHED
except Exception:
logger.error('Error in loop', exc_info=True)
await self._run_callbacks(
AgentErrorObservation('Oops! Something went wrong while completing your task. You can check the logs for more info.'))
await self.set_task_state_to(TaskState.STOPPED)
break
if self._task_state == TaskState.FINISHED:
logger.info('Task finished by agent')
await self.reset_task()
break
elif self._task_state == TaskState.STOPPED:
logger.info('Task stopped by user')
await self.reset_task()
break
elif self._task_state == TaskState.PAUSED:
logger.info('Task paused')
self._cur_step = i + 1
await self.notify_task_state_changed()
break
if self._is_stuck():
logger.info('Loop detected, stopping task')
observation = AgentErrorObservation('I got stuck into a loop, the task has stopped.')
await self._run_callbacks(observation)
await self.set_task_state_to(TaskState.STOPPED)
break
async def setup_task(self, task: str, inputs: dict = {}):
"""Sets up the agent controller with a task.
"""
self._task_state = TaskState.RUNNING
await self.notify_task_state_changed()
self.state = State(Plan(task))
self.state.inputs = inputs
async def start(self, task: str):
"""Starts the agent controller with a task.
If task already run before, it will continue from the last step.
"""
await self.setup_task(task)
await self._run()
async def resume(self):
if self.state is None:
raise ValueError('No task to resume')
self._task_state = TaskState.RUNNING
await self.notify_task_state_changed()
await self._run()
async def reset_task(self):
self.state = None
self._cur_step = 0
self._task_state = TaskState.INIT
self.agent.reset()
await self.notify_task_state_changed()
async def set_task_state_to(self, state: TaskState):
self._task_state = state
if state == TaskState.STOPPED:
await self.reset_task()
logger.info(f'Task state set to {state}')
def get_task_state(self):
"""Returns the current state of the agent task."""
return self._task_state
async def notify_task_state_changed(self):
await self._run_callbacks(TaskStateChangedAction(self._task_state))
async def add_user_message(self, message: UserMessageObservation):
if self.state is None:
return
if self._task_state == TaskState.AWAITING_USER_INPUT:
self._await_user_message_queue.put_nowait(message)
# set the task state to running
self._task_state = TaskState.RUNNING
await self.notify_task_state_changed()
elif self._task_state == TaskState.RUNNING:
self.add_history(NullAction(), message)
else:
raise ValueError(f'Task (state: {self._task_state}) is not in a state to add user message')
async def wait_for_user_input(self) -> UserMessageObservation:
self._task_state = TaskState.AWAITING_USER_INPUT
await self.notify_task_state_changed()
# wait for the next user message
if len(self.callbacks) == 0:
logger.info('Use STDIN to request user message as no callbacks are registered', extra={'msg_type': 'INFO'})
message = input('Request user input [type /exit to stop interaction] >> ')
user_message_observation = UserMessageObservation(message)
else:
user_message_observation = await self._await_user_message_queue.get()
self._await_user_message_queue.task_done()
return user_message_observation
async def start_delegate(self, action: AgentDelegateAction):
AgentCls: Type[Agent] = Agent.get_cls(action.agent)
agent = AgentCls(llm=self.agent.llm)
self.delegate = AgentController(
sid=self.id + '-delegate',
agent=agent,
max_iterations=self.max_iterations,
max_chars=self.max_chars,
callbacks=self.callbacks,
)
task = action.inputs.get('task') or ''
await self.delegate.setup_task(task, action.inputs)
async def step(self, i: int) -> bool:
if self.state is None:
raise ValueError('No task to run')
if self.delegate is not None:
delegate_done = await self.delegate.step(i)
if delegate_done:
outputs = self.delegate.state.outputs if self.delegate.state else {}
obs: Observation = AgentDelegateObservation(content='', outputs=outputs)
self.add_history(NullAction(), obs)
self.delegate = None
self.delegateAction = None
return False
logger.info(f'STEP {i}', extra={'msg_type': 'STEP'})
if i == 0:
logger.info(self.state.plan.main_goal, extra={'msg_type': 'PLAN'})
if self.state.num_of_chars > self.max_chars:
raise MaxCharsExceedError(self.state.num_of_chars, self.max_chars)
log_obs = self.action_manager.get_background_obs()
for obs in log_obs:
self.add_history(NullAction(), obs)
await self._run_callbacks(obs)
logger.info(obs, extra={'msg_type': 'BACKGROUND LOG'})
self.update_state_for_step(i)
action: Action = NullAction()
observation: Observation = NullObservation('')
try:
action = self.agent.step(self.state)
if action is None:
raise AgentNoActionError('No action was returned')
except (AgentMalformedActionError, AgentNoActionError, LLMOutputError) as e:
observation = AgentErrorObservation(str(e))
logger.info(action, extra={'msg_type': 'ACTION'})
self.update_state_after_step()
await self._run_callbacks(action)
# whether to await for user messages
if isinstance(action, AgentTalkAction):
# await for the next user messages
user_message_observation = await self.wait_for_user_input()
logger.info(user_message_observation, extra={'msg_type': 'OBSERVATION'})
self.add_history(action, user_message_observation)
return False
finished = isinstance(action, AgentFinishAction)
if finished:
self.state.outputs = action.outputs # type: ignore[attr-defined]
logger.info(action, extra={'msg_type': 'INFO'})
return True
if isinstance(observation, NullObservation):
observation = await self.action_manager.run_action(action, self)
if not isinstance(observation, NullObservation):
logger.info(observation, extra={'msg_type': 'OBSERVATION'})
self.add_history(action, observation)
await self._run_callbacks(observation)
return False
async def _run_callbacks(self, event):
if event is None:
return
for callback in self.callbacks:
idx = self.callbacks.index(callback)
try:
await callback(event)
except Exception as e:
logger.exception(f'Callback error: {e}, idx: {idx}')
await asyncio.sleep(
0.001
) # Give back control for a tick, so we can await in callbacks
def get_state(self):
return self.state
def _is_stuck(self):
if self.state is None or self.state.history is None or len(self.state.history) < 3:
return False
# if the last three (Action, Observation) tuples are too repetitive
# the agent got stuck in a loop
if all(
[self.state.history[-i][0] == self.state.history[-3][0] for i in range(1, 3)]
):
# it repeats same action, give it a chance, but not if:
if (all
(isinstance(self.state.history[-i][1], NullObservation) for i in range(1, 4))):
# same (Action, NullObservation): like 'think' the same thought over and over
logger.debug('Action, NullObservation loop detected')
return True
elif (all
(isinstance(self.state.history[-i][1], AgentErrorObservation) for i in range(1, 4))):
# (NullAction, AgentErrorObservation): errors coming from an exception
# (Action, AgentErrorObservation): the same action getting an error, even if not necessarily the same error
logger.debug('Action, AgentErrorObservation loop detected')
return True
return False