Server working with agent library (#97)

* server working with agent library

* update readme

* add messages to events

* factor out steps

* fix websocket messages

* allow user to run arbitrary actions

* allow user to run commands before a task is started

* fix main.py

* check JSON

* handle errors in controller better

* fix memory issue

* better error handling and task cancellation

* fix monologue len

* fix imports

* remove server from lint check

* fix lint issues

* fix lint errors
This commit is contained in:
Robert Brennan 2024-03-24 10:24:44 -04:00 committed by GitHub
parent fb1822123a
commit 4aa24eb41d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 242 additions and 252 deletions

View File

@ -29,6 +29,6 @@ jobs:
- name: Install dependencies
run: pip install ruff mypy types-requests
- name: Run ruff
run: ruff check --config dev_config/python/ruff.toml opendevin/ server/ agenthub/
run: ruff check --config dev_config/python/ruff.toml opendevin/ agenthub/
- name: Run mypy
run: mypy --config-file dev_config/python/mypy.ini opendevin/ server/ agenthub/
run: mypy --config-file dev_config/python/mypy.ini opendevin/ agenthub/

View File

@ -55,7 +55,6 @@ class CodeActAgent(Agent):
self,
instruction: str,
workspace_dir: str,
model_name: str,
max_steps: int = 100
) -> None:
"""
@ -65,7 +64,7 @@ class CodeActAgent(Agent):
- instruction (str): The instruction for the agent to execute.
- max_steps (int): The maximum number of steps to run the agent.
"""
super().__init__(instruction, workspace_dir, model_name, max_steps)
super().__init__(instruction, workspace_dir, max_steps)
self._history = [Message(Role.SYSTEM, SYSTEM_MESSAGE)]
self._history.append(Message(Role.USER, instruction))
self.env = DockerInteractive(workspace_dir=workspace_dir)

View File

@ -1,7 +1,6 @@
from typing import List
from typing import List, Any
from opendevin.agent import Agent
from agenthub.langchains_agent.utils.agent import Agent as LangchainsAgentImpl
from opendevin.lib.event import Event
@ -46,10 +45,13 @@ INITIAL_THOUGHTS = [
class LangchainsAgent(Agent):
_initialized = False
agent: Any = None
def _initialize(self):
if self._initialized:
return
if self.instruction is None or self.instruction == "":
raise ValueError("Instruction must be provided")
self.agent = LangchainsAgentImpl(self.instruction, self.model_name)
next_is_output = False
for thought in INITIAL_THOUGHTS:
@ -76,7 +78,8 @@ class LangchainsAgent(Agent):
self._initialized = True
def add_event(self, event: Event) -> None:
self.agent.add_event(event)
if self.agent:
self.agent.add_event(event)
def step(self, cmd_mgr) -> Event:
self._initialize()

View File

@ -10,7 +10,7 @@ from llama_index.vector_stores.chroma import ChromaVectorStore
class LongTermMemory:
def __init__(self):
db = chromadb.Client()
self.collection = db.create_collection(name="memories")
self.collection = db.get_or_create_collection(name="memories")
vector_store = ChromaVectorStore(chroma_collection=self.collection)
self.index = VectorStoreIndex.from_vector_store(vector_store)
self.thought_idx = 0

View File

@ -19,7 +19,6 @@ class Monologue:
def condense(self):
new_thoughts = llm.summarize_monologue(self.thoughts, self.model_name)
print("new thoughts", new_thoughts)
self.thoughts = [Event(t['action'], t['args']) for t in new_thoughts]

View File

@ -54,18 +54,17 @@ class Agent(ABC):
def __init__(
self,
instruction: str,
workspace_dir: str,
model_name: str,
max_steps: int = 100
):
self.instruction = instruction
self.instruction = ""
self.workspace_dir = workspace_dir
self.model_name = model_name
self.max_steps = max_steps
self._complete = False
self._history: List[Message] = [Message(Role.USER, instruction)]
self._history: List[Message] = []
@property
def complete(self) -> bool:

View File

@ -1,3 +1,5 @@
import asyncio
from opendevin.lib.command_manager import CommandManager
from opendevin.lib.event import Event
@ -14,35 +16,53 @@ class AgentController:
self.callbacks.append(self.agent.add_event)
self.callbacks.append(print_callback)
def maybe_perform_action(self, event):
if not (event and event.is_runnable()):
return
action = 'output'
async def add_user_event(self, event: Event):
await self.handle_action(event)
async def start_loop(self, task):
try:
output = event.run(self)
self.agent.instruction = task
for i in range(self.max_iterations):
print("STEP", i, flush=True)
done = await self.step()
if done:
print("FINISHED", flush=True)
break
except Exception as e:
output = 'Error: ' + str(e)
action = 'error'
out_event = Event(action, {'output': output})
return out_event
print("Error in loop", e, flush=True)
pass
def start_loop(self):
for i in range(self.max_iterations):
print("STEP", i, flush=True)
log_events = self.command_manager.get_background_events()
for event in log_events:
for callback in self.callbacks:
callback(event)
async def step(self) -> bool:
log_events = self.command_manager.get_background_events()
for event in log_events:
await self.run_callbacks(event)
try:
action_event = self.agent.step(self.command_manager)
for callback in self.callbacks:
callback(action_event)
if action_event.action == 'finish':
break
print("---", flush=True)
except Exception as e:
action_event = Event('error', {'error': str(e)})
if action_event is None:
action_event = Event('error', {'error': "Agent did not return an event"})
output_event = self.maybe_perform_action(action_event)
if output_event is not None:
for callback in self.callbacks:
callback(output_event)
print("==============", flush=True)
await self.handle_action(action_event)
return action_event.action == 'finish'
async def handle_action(self, event: Event):
print("=== HANDLING EVENT ===", flush=True)
await self.run_callbacks(event)
print("--- EVENT OUTPUT ---", flush=True)
output_event = event.run(self)
await self.run_callbacks(output_event)
async def run_callbacks(self, event):
if event is None:
return
for callback in self.callbacks:
idx = self.callbacks.index(callback)
try:
callback(event)
except Exception as e:
print("Callback error:" + str(idx), e, flush=True)
pass
await asyncio.sleep(0.001) # Give back control for a tick, so we can await in callbacks

View File

@ -1,14 +1,15 @@
import opendevin.lib.actions as actions
ACTION_TYPES = ['run', 'kill', 'browse', 'read', 'write', 'recall', 'think', 'summarize', 'output', 'error', 'finish']
ACTION_TYPES = ['initialize', 'start', 'summarize', 'run', 'kill', 'browse', 'read', 'write', 'recall', 'think', 'output', 'error', 'finish']
RUNNABLE_ACTIONS = ['run', 'kill', 'browse', 'read', 'write', 'recall']
class Event:
def __init__(self, action, args):
def __init__(self, action, args, message=None):
if action not in ACTION_TYPES:
raise ValueError('Invalid action type: ' + action)
self.action = action
self.args = args
self.message = message
def __str__(self):
return self.action + " " + str(self.args)
@ -25,10 +26,48 @@ class Event:
'args': self.args
}
def get_message(self) -> str:
if self.message is not None:
return self.message
if self.action == 'run':
return 'Running command: ' + self.args['command']
elif self.action == 'kill':
return 'Killing command: ' + self.args['id']
elif self.action == 'browse':
return 'Browsing: ' + self.args['url']
elif self.action == 'read':
return 'Reading file: ' + self.args['path']
elif self.action == 'write':
return 'Writing to file: ' + self.args['path']
elif self.action == 'recall':
return 'Recalling memory: ' + self.args['query']
elif self.action == 'think':
return self.args['thought']
elif self.action == 'output':
return "Got output."
elif self.action == 'error':
return "Got an error: " + self.args['output']
elif self.action == 'finish':
return "Finished!"
else:
return ""
def is_runnable(self):
return self.action in RUNNABLE_ACTIONS
def run(self, agent_controller):
if not self.is_runnable():
return None
action = 'output'
try:
output = self._run_and_get_output(agent_controller)
except Exception as e:
output = 'Error: ' + str(e)
action = 'error'
out_event = Event(action, {'output': output})
return out_event
def _run_and_get_output(self, agent_controller) -> str:
if self.action == 'run':
cmd = self.args['command']
background = False

View File

@ -1,4 +1,5 @@
from typing import Type
import asyncio
import argparse
from opendevin.agent import Agent
@ -16,10 +17,8 @@ if __name__ == "__main__":
AgentCls: Type[Agent] = Agent.get_cls(args.agent_cls)
agent = AgentCls(
instruction=args.task,
workspace_dir=args.directory,
model_name=args.model_name
)
controller = AgentController(agent, args.directory)
controller.start_loop()
asyncio.run(controller.start_loop(args.task))

View File

@ -0,0 +1,18 @@
# OpenDevin server
This is currently just a POC that starts an echo websocket inside docker, and
forwards messages between the client and the docker container.
## Start the Server
```
python -m pip install -r requirements.txt
uvicorn opendevin.server.listen:app --reload --port 3000
```
## Test the Server
You can use `websocat` to test the server: https://github.com/vi/websocat
```
websocat ws://127.0.0.1:3000/ws
{"action": "start", "args": {"task": "write a bash script that prints hello"}}
```

View File

@ -0,0 +1,13 @@
from opendevin.server.session import Session
from fastapi import FastAPI, WebSocket
app = FastAPI()
# This endpoint recieves events from the client (i.e. the browser)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
session = Session(websocket)
# TODO: should this use asyncio instead of await?
await session.start_listening()

110
opendevin/server/session.py Normal file
View File

@ -0,0 +1,110 @@
import os
import asyncio
from typing import Optional
from fastapi import WebSocketDisconnect
from opendevin.agent import Agent
from opendevin.controller import AgentController
from opendevin.lib.event import Event
def parse_event(data):
if "action" not in data:
return None
action = data["action"]
args = {}
if "args" in data:
args = data["args"]
message = None
if "message" in data:
message = data["message"]
return Event(action, args, message)
class Session:
def __init__(self, websocket):
self.websocket = websocket
self.controller: Optional[AgentController] = None
self.agent: Optional[Agent] = None
self.agent_task = None
asyncio.create_task(self.create_controller(), name="create controller") # FIXME: starting the docker container synchronously causes a websocket error...
async def send_error(self, message):
await self.send({"error": True, "message": message})
async def send_message(self, message):
await self.send({"message": message})
async def send(self, data):
if self.websocket is None:
return
try:
await self.websocket.send_json(data)
except Exception as e:
print("Error sending data to client", e)
async def start_listening(self):
try:
while True:
try:
data = await self.websocket.receive_json()
except ValueError:
await self.send_error("Invalid JSON")
continue
event = parse_event(data)
if event is None:
await self.send_error("Invalid event")
continue
if event.action == "initialize":
await self.create_controller(event)
elif event.action == "start":
await self.start_task(event)
else:
if self.controller is None:
await self.send_error("No agent started. Please wait a second...")
else:
await self.controller.add_user_event(event)
except WebSocketDisconnect as e:
self.websocket = None
if self.agent_task:
self.agent_task.cancel()
print("Client websocket disconnected", e)
async def create_controller(self, start_event=None):
directory = os.getcwd()
if start_event and "directory" in start_event.args:
directory = start_event.args["directory"]
agent_cls = "LangchainsAgent"
if start_event and "agent_cls" in start_event.args:
agent_cls = start_event.args["agent_cls"]
model = "gpt-4-0125-preview"
if start_event and "model" in start_event.args:
model = start_event.args["model"]
AgentCls = Agent.get_cls(agent_cls)
self.agent = AgentCls(
workspace_dir=directory,
model_name=model,
)
self.controller = AgentController(self.agent, directory, callbacks=[self.on_agent_event])
await self.send_message("Control loop started")
async def start_task(self, start_event):
if "task" not in start_event.args:
await self.send_error("No task specified")
return
await self.send_message("Starting new task...")
task = start_event.args["task"]
if self.controller is None:
await self.send_error("No agent started. Please wait a second...")
return
self.agent_task = asyncio.create_task(self.controller.start_loop(task), name="agent loop")
def on_agent_event(self, event):
evt = {
"action": event.action,
"message": event.get_message(),
"args": event.args,
}
asyncio.create_task(self.send(evt), name="send event in callback")

View File

@ -1,27 +0,0 @@
# OpenDevin server
This is currently just a POC that starts an echo websocket inside docker, and
forwards messages between the client and the docker container.
## Start the Server
```
cd server
python -m pip install -r requirements.txt
uvicorn server:app --reload --port 3000
```
## Test the Server
You can use `websocat` to test the server: https://github.com/vi/websocat
```
websocat ws://127.0.0.1:3000/ws
{"source":"client","action":"start"}
```
### Test cases
We should be robust to these cases:
* Client connects, sends start command, agent starts up, client disconnects
* Client connects, sends start command, disconnects before agent starts
* Client connects, sends start command, agent disconnects (i.e. docker container is killed)
* Client connects, sends start command, agent starts up, client sends second start command
In each case, the client should be able to reconnect and send a start command

View File

@ -1,182 +0,0 @@
import json
import os
from time import sleep
import docker
import websockets
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from starlette.websockets import WebSocketState
app = FastAPI()
CONTAINER_NAME = "devin-agent"
AGENT_LISTEN_PORT = 8080
AGENT_BIND_PORT = os.environ.get("AGENT_PORT", 4522)
MAX_WAIT_TIME_SECONDS = 30
agent_listener = None
client_fast_websocket = None
agent_websocket = None
def get_message_payload(message):
return {"source": "server", "message": message}
def get_error_payload(message):
payload = get_message_payload(message)
payload["error"] = True
return payload
# This endpoint recieves events from the client (i.e. the browser)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
global client_fast_websocket
global agent_websocket
await websocket.accept()
client_fast_websocket = websocket
try:
while True:
data = await websocket.receive_json()
if "action" not in data:
await send_message_to_client(get_error_payload("No action specified"))
continue
action = data["action"]
if action == "start":
await send_message_to_client(get_message_payload("Starting new agent..."))
directory = os.getcwd()
if "directory" in data:
directory = data["directory"]
try:
await restart_docker_container(directory)
except Exception as e:
print("error while restarting docker container:", e)
await send_message_to_client(get_error_payload("Failed to start container: " + str(e)))
continue
if action == "terminal":
msg = {
"action": "terminal",
"data": data["data"]
}
await send_message_to_client(get_message_payload(msg))
else:
if agent_websocket is None:
await send_message_to_client(get_error_payload("Agent not connected"))
continue
except WebSocketDisconnect:
print("Client websocket disconnected")
await close_all_websockets(get_error_payload("Client disconnected"))
async def stop_docker_container():
docker_client = docker.from_env()
try:
container = docker_client.containers.get(CONTAINER_NAME)
container.stop()
container.remove()
elapsed = 0
while container.status != "exited":
print("waiting for container to stop...")
sleep(1)
elapsed += 1
if elapsed > MAX_WAIT_TIME_SECONDS:
break
container = docker_client.containers.get(CONTAINER_NAME)
except docker.errors.NotFound:
pass
async def restart_docker_container(directory):
await stop_docker_container()
docker_client = docker.from_env()
container = docker_client.containers.run(
"jmalloc/echo-server",
name=CONTAINER_NAME,
detach=True,
ports={str(AGENT_LISTEN_PORT) + "/tcp": AGENT_BIND_PORT},
volumes={directory: {"bind": "/workspace", "mode": "rw"}})
# wait for container to be ready
elapsed = 0
while container.status != "running":
if container.status == "exited":
print("container exited")
print("container logs:")
print(container.logs())
break
print("waiting for container to start...")
sleep(1)
elapsed += 1
container = docker_client.containers.get(CONTAINER_NAME)
if elapsed > MAX_WAIT_TIME_SECONDS:
break
if container.status != "running":
raise Exception("Failed to start container")
async def listen_for_agent_messages():
global agent_websocket
global client_fast_websocket
try:
async with websockets.connect("ws://localhost:" + str(AGENT_BIND_PORT)) as ws:
agent_websocket = ws
await send_message_to_client(get_message_payload("Agent connected!"))
await send_message_to_agent({"source": "server", "message": "Hello, agent!"})
try:
async for message in agent_websocket:
if client_fast_websocket is None:
print("Client websocket not connected")
await close_all_websockets(get_error_payload("Client not connected"))
break
try:
data = json.loads(message)
except Exception as e:
print("error parsing message from agent:", message)
print(e)
continue
if "source" not in data or data["source"] != "agent":
# TODO: remove this once we're not using echo server
print("echo server responded", data)
continue
await send_message_to_agent(data)
except websockets.exceptions.ConnectionClosed:
await send_message_to_client(get_error_payload("Agent disconnected"))
except Exception as e:
print("error connecting to agent:", e)
payload = get_error_payload("Failed to connect to agent: " + str(e))
await send_message_to_client(payload)
await close_agent_websocket(payload)
async def send_message_to_client(data):
print("to client:", data)
if client_fast_websocket is None:
return
await client_fast_websocket.send_json(data)
async def send_message_to_agent(data):
print("to agent:", data)
if agent_websocket is None:
return
await agent_websocket.send(json.dumps(data))
async def close_agent_websocket(payload):
global agent_websocket
if agent_websocket is not None:
if not agent_websocket.closed:
await send_message_to_agent(payload)
await agent_websocket.close()
agent_websocket = None
await stop_docker_container()
async def close_client_websocket(payload):
global client_fast_websocket
if client_fast_websocket is not None:
if client_fast_websocket.client_state != WebSocketState.DISCONNECTED:
await send_message_to_client(payload)
await client_fast_websocket.close()
client_fast_websocket = None
async def close_all_websockets(payload):
await close_agent_websocket(payload)
await close_client_websocket(payload)