OpenHands/opendevin/controller/agent_controller.py
Engel Nyst 1115b60a74
Logging additions and fixes (#1139)
* Refactor print_to_color into a color formatter

misc fixes

catch ValueErrors and others from Router initialization

add default methods

* Tweak console log formatting, clean up after rebasing exceptions out

* Fix prompts/responses

* clean up

* keep regular colors when no msg_type

* fix filename

* handle file log first

* happy mypy

* ok, mypy

---------

Co-authored-by: Robert Brennan <accounts@rbren.io>
2024-04-16 12:55:22 -04:00

177 lines
6.3 KiB
Python

import asyncio
import inspect
import traceback
import time
from typing import List, Callable, Awaitable, cast
from opendevin.plan import Plan
from opendevin.state import State
from opendevin.agent import Agent
from opendevin.observation import Observation, AgentErrorObservation, NullObservation
from litellm.exceptions import APIConnectionError
from openai import AuthenticationError
from opendevin import config
from opendevin.logger import opendevin_logger as logger
from opendevin.exceptions import MaxCharsExceedError
from .command_manager import CommandManager
from opendevin.action import (
Action,
NullAction,
AgentFinishAction,
AddTaskAction,
ModifyTaskAction,
)
from opendevin.exceptions import AgentNoActionError
MAX_ITERATIONS = config.get('MAX_ITERATIONS')
MAX_CHARS = config.get('MAX_CHARS')
class AgentController:
id: str
agent: Agent
max_iterations: int
command_manager: CommandManager
callbacks: List[Callable]
def __init__(
self,
agent: Agent,
sid: str = '',
max_iterations: int = MAX_ITERATIONS,
max_chars: int = MAX_CHARS,
container_image: str | None = None,
callbacks: List[Callable] = [],
):
self.id = sid
self.agent = agent
self.max_iterations = max_iterations
self.command_manager = CommandManager(self.id, container_image)
self.max_chars = max_chars
self.callbacks = callbacks
def update_state_for_step(self, i):
self.state.iteration = i
self.state.background_commands_obs = self.command_manager.get_background_obs()
def update_state_after_step(self):
self.state.updated_info = []
def add_history(self, action: Action, observation: Observation):
if not isinstance(action, Action):
raise TypeError(
f'action must be an instance of Action, got {type(action).__name__} instead')
if not isinstance(observation, Observation):
raise TypeError(
f'observation must be an instance of Observation, got {type(observation).__name__} instead')
self.state.history.append((action, observation))
self.state.updated_info.append((action, observation))
async def start_loop(self, task: str):
finished = False
plan = Plan(task)
self.state = State(plan)
for i in range(self.max_iterations):
try:
finished = await self.step(i)
except Exception as e:
logger.error('Error in loop', exc_info=True)
raise e
if finished:
break
if not finished:
logger.info('Exited before finishing the task.')
async def step(self, i: int):
logger.info(f'STEP {i}', extra={'msg_type': 'STEP'})
logger.info(self.state.plan.main_goal, extra={'msg_type': 'PLAN'})
if self.state.num_of_chars > self.max_chars:
raise MaxCharsExceedError(
self.state.num_of_chars, self.max_chars)
log_obs = self.command_manager.get_background_obs()
for obs in log_obs:
self.add_history(NullAction(), obs)
await self._run_callbacks(obs)
logger.info(obs, extra={'msg_type': 'BACKGROUND LOG'})
self.update_state_for_step(i)
action: Action = NullAction()
observation: Observation = NullObservation('')
try:
action = self.agent.step(self.state)
if action is None:
raise AgentNoActionError()
logger.info(action, extra={'msg_type': 'ACTION'})
except Exception as e:
observation = AgentErrorObservation(str(e))
logger.error(e)
if isinstance(e, APIConnectionError):
time.sleep(3)
# raise specific exceptions that need to be handled outside
# note: we are using AuthenticationError class from openai rather than
# litellm because:
# 1) litellm.exceptions.AuthenticationError is a subclass of openai.AuthenticationError
# 2) embeddings call, initiated by llama-index, has no wrapper for authentication
# errors. This means we have to catch individual authentication errors
# from different providers, and OpenAI is one of these.
if isinstance(e, (AuthenticationError, AgentNoActionError)):
raise
self.update_state_after_step()
await self._run_callbacks(action)
finished = isinstance(action, AgentFinishAction)
if finished:
logger.info(action, extra={'msg_type': 'INFO'})
return True
if isinstance(action, AddTaskAction):
try:
self.state.plan.add_subtask(
action.parent, action.goal, action.subtasks)
except Exception as e:
observation = AgentErrorObservation(str(e))
logger.error(e)
traceback.print_exc()
elif isinstance(action, ModifyTaskAction):
try:
self.state.plan.set_subtask_state(action.id, action.state)
except Exception as e:
observation = AgentErrorObservation(str(e))
logger.error(e)
traceback.print_exc()
if action.executable:
try:
observation = action.run(self)
if inspect.isawaitable(observation):
observation = await cast(Awaitable[Observation], observation)
except Exception as e:
observation = AgentErrorObservation(str(e))
logger.error(e)
traceback.print_exc()
if not isinstance(observation, NullObservation):
logger.info(observation, extra={'msg_type': 'OBSERVATION'})
self.add_history(action, observation)
await self._run_callbacks(observation)
async def _run_callbacks(self, event):
if event is None:
return
for callback in self.callbacks:
idx = self.callbacks.index(callback)
try:
callback(event)
except Exception as e:
logger.exception(f'Callback error: {e}, idx: {idx}')
await asyncio.sleep(
0.001
) # Give back control for a tick, so we can await in callbacks