From 4aa24eb41d51392b17bb4a2df50554fd73f89834 Mon Sep 17 00:00:00 2001 From: Robert Brennan Date: Sun, 24 Mar 2024 10:24:44 -0400 Subject: [PATCH] Server working with agent library (#97) * server working with agent library * update readme * add messages to events * factor out steps * fix websocket messages * allow user to run arbitrary actions * allow user to run commands before a task is started * fix main.py * check JSON * handle errors in controller better * fix memory issue * better error handling and task cancellation * fix monologue len * fix imports * remove server from lint check * fix lint issues * fix lint errors --- .github/workflows/lint.yml | 4 +- agenthub/codeact_agent/__init__.py | 3 +- agenthub/langchains_agent/__init__.py | 9 +- agenthub/langchains_agent/utils/memory.py | 2 +- agenthub/langchains_agent/utils/monologue.py | 1 - opendevin/agent.py | 5 +- opendevin/controller.py | 72 +++++--- opendevin/lib/event.py | 43 ++++- opendevin/main.py | 5 +- opendevin/server/README.md | 18 ++ opendevin/server/listen.py | 13 ++ opendevin/server/session.py | 110 +++++++++++ server/README.md | 27 --- server/server.py | 182 ------------------- 14 files changed, 242 insertions(+), 252 deletions(-) create mode 100644 opendevin/server/README.md create mode 100644 opendevin/server/listen.py create mode 100644 opendevin/server/session.py delete mode 100644 server/README.md delete mode 100644 server/server.py diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 71dbd313ff..3a7d42b105 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -29,6 +29,6 @@ jobs: - name: Install dependencies run: pip install ruff mypy types-requests - name: Run ruff - run: ruff check --config dev_config/python/ruff.toml opendevin/ server/ agenthub/ + run: ruff check --config dev_config/python/ruff.toml opendevin/ agenthub/ - name: Run mypy - run: mypy --config-file dev_config/python/mypy.ini opendevin/ server/ agenthub/ + run: mypy --config-file dev_config/python/mypy.ini opendevin/ agenthub/ diff --git a/agenthub/codeact_agent/__init__.py b/agenthub/codeact_agent/__init__.py index 829627d94d..8b925723b6 100644 --- a/agenthub/codeact_agent/__init__.py +++ b/agenthub/codeact_agent/__init__.py @@ -55,7 +55,6 @@ class CodeActAgent(Agent): self, instruction: str, workspace_dir: str, - model_name: str, max_steps: int = 100 ) -> None: """ @@ -65,7 +64,7 @@ class CodeActAgent(Agent): - instruction (str): The instruction for the agent to execute. - max_steps (int): The maximum number of steps to run the agent. """ - super().__init__(instruction, workspace_dir, model_name, max_steps) + super().__init__(instruction, workspace_dir, max_steps) self._history = [Message(Role.SYSTEM, SYSTEM_MESSAGE)] self._history.append(Message(Role.USER, instruction)) self.env = DockerInteractive(workspace_dir=workspace_dir) diff --git a/agenthub/langchains_agent/__init__.py b/agenthub/langchains_agent/__init__.py index 4296f89ced..b1a06b4526 100644 --- a/agenthub/langchains_agent/__init__.py +++ b/agenthub/langchains_agent/__init__.py @@ -1,7 +1,6 @@ -from typing import List +from typing import List, Any from opendevin.agent import Agent - from agenthub.langchains_agent.utils.agent import Agent as LangchainsAgentImpl from opendevin.lib.event import Event @@ -46,10 +45,13 @@ INITIAL_THOUGHTS = [ class LangchainsAgent(Agent): _initialized = False + agent: Any = None def _initialize(self): if self._initialized: return + if self.instruction is None or self.instruction == "": + raise ValueError("Instruction must be provided") self.agent = LangchainsAgentImpl(self.instruction, self.model_name) next_is_output = False for thought in INITIAL_THOUGHTS: @@ -76,7 +78,8 @@ class LangchainsAgent(Agent): self._initialized = True def add_event(self, event: Event) -> None: - self.agent.add_event(event) + if self.agent: + self.agent.add_event(event) def step(self, cmd_mgr) -> Event: self._initialize() diff --git a/agenthub/langchains_agent/utils/memory.py b/agenthub/langchains_agent/utils/memory.py index f8ec812ada..c73bc613e9 100644 --- a/agenthub/langchains_agent/utils/memory.py +++ b/agenthub/langchains_agent/utils/memory.py @@ -10,7 +10,7 @@ from llama_index.vector_stores.chroma import ChromaVectorStore class LongTermMemory: def __init__(self): db = chromadb.Client() - self.collection = db.create_collection(name="memories") + self.collection = db.get_or_create_collection(name="memories") vector_store = ChromaVectorStore(chroma_collection=self.collection) self.index = VectorStoreIndex.from_vector_store(vector_store) self.thought_idx = 0 diff --git a/agenthub/langchains_agent/utils/monologue.py b/agenthub/langchains_agent/utils/monologue.py index b4c5675c06..53ca081c54 100644 --- a/agenthub/langchains_agent/utils/monologue.py +++ b/agenthub/langchains_agent/utils/monologue.py @@ -19,7 +19,6 @@ class Monologue: def condense(self): new_thoughts = llm.summarize_monologue(self.thoughts, self.model_name) - print("new thoughts", new_thoughts) self.thoughts = [Event(t['action'], t['args']) for t in new_thoughts] diff --git a/opendevin/agent.py b/opendevin/agent.py index 5274c9ed1b..c6cf0bc150 100644 --- a/opendevin/agent.py +++ b/opendevin/agent.py @@ -54,18 +54,17 @@ class Agent(ABC): def __init__( self, - instruction: str, workspace_dir: str, model_name: str, max_steps: int = 100 ): - self.instruction = instruction + self.instruction = "" self.workspace_dir = workspace_dir self.model_name = model_name self.max_steps = max_steps self._complete = False - self._history: List[Message] = [Message(Role.USER, instruction)] + self._history: List[Message] = [] @property def complete(self) -> bool: diff --git a/opendevin/controller.py b/opendevin/controller.py index 69ad529a8d..008440ed60 100644 --- a/opendevin/controller.py +++ b/opendevin/controller.py @@ -1,3 +1,5 @@ +import asyncio + from opendevin.lib.command_manager import CommandManager from opendevin.lib.event import Event @@ -14,35 +16,53 @@ class AgentController: self.callbacks.append(self.agent.add_event) self.callbacks.append(print_callback) - def maybe_perform_action(self, event): - if not (event and event.is_runnable()): - return - action = 'output' + async def add_user_event(self, event: Event): + await self.handle_action(event) + + async def start_loop(self, task): try: - output = event.run(self) + self.agent.instruction = task + for i in range(self.max_iterations): + print("STEP", i, flush=True) + done = await self.step() + if done: + print("FINISHED", flush=True) + break except Exception as e: - output = 'Error: ' + str(e) - action = 'error' - out_event = Event(action, {'output': output}) - return out_event + print("Error in loop", e, flush=True) + pass - def start_loop(self): - for i in range(self.max_iterations): - print("STEP", i, flush=True) - log_events = self.command_manager.get_background_events() - for event in log_events: - for callback in self.callbacks: - callback(event) + async def step(self) -> bool: + log_events = self.command_manager.get_background_events() + for event in log_events: + await self.run_callbacks(event) + + try: action_event = self.agent.step(self.command_manager) - for callback in self.callbacks: - callback(action_event) - if action_event.action == 'finish': - break - print("---", flush=True) + except Exception as e: + action_event = Event('error', {'error': str(e)}) + if action_event is None: + action_event = Event('error', {'error': "Agent did not return an event"}) - output_event = self.maybe_perform_action(action_event) - if output_event is not None: - for callback in self.callbacks: - callback(output_event) - print("==============", flush=True) + await self.handle_action(action_event) + return action_event.action == 'finish' + + async def handle_action(self, event: Event): + print("=== HANDLING EVENT ===", flush=True) + await self.run_callbacks(event) + print("--- EVENT OUTPUT ---", flush=True) + output_event = event.run(self) + await self.run_callbacks(output_event) + + async def run_callbacks(self, event): + if event is None: + return + for callback in self.callbacks: + idx = self.callbacks.index(callback) + try: + callback(event) + except Exception as e: + print("Callback error:" + str(idx), e, flush=True) + pass + await asyncio.sleep(0.001) # Give back control for a tick, so we can await in callbacks diff --git a/opendevin/lib/event.py b/opendevin/lib/event.py index 6a07edb622..bf938c5b72 100644 --- a/opendevin/lib/event.py +++ b/opendevin/lib/event.py @@ -1,14 +1,15 @@ import opendevin.lib.actions as actions -ACTION_TYPES = ['run', 'kill', 'browse', 'read', 'write', 'recall', 'think', 'summarize', 'output', 'error', 'finish'] +ACTION_TYPES = ['initialize', 'start', 'summarize', 'run', 'kill', 'browse', 'read', 'write', 'recall', 'think', 'output', 'error', 'finish'] RUNNABLE_ACTIONS = ['run', 'kill', 'browse', 'read', 'write', 'recall'] class Event: - def __init__(self, action, args): + def __init__(self, action, args, message=None): if action not in ACTION_TYPES: raise ValueError('Invalid action type: ' + action) self.action = action self.args = args + self.message = message def __str__(self): return self.action + " " + str(self.args) @@ -25,10 +26,48 @@ class Event: 'args': self.args } + def get_message(self) -> str: + if self.message is not None: + return self.message + if self.action == 'run': + return 'Running command: ' + self.args['command'] + elif self.action == 'kill': + return 'Killing command: ' + self.args['id'] + elif self.action == 'browse': + return 'Browsing: ' + self.args['url'] + elif self.action == 'read': + return 'Reading file: ' + self.args['path'] + elif self.action == 'write': + return 'Writing to file: ' + self.args['path'] + elif self.action == 'recall': + return 'Recalling memory: ' + self.args['query'] + elif self.action == 'think': + return self.args['thought'] + elif self.action == 'output': + return "Got output." + elif self.action == 'error': + return "Got an error: " + self.args['output'] + elif self.action == 'finish': + return "Finished!" + else: + return "" + def is_runnable(self): return self.action in RUNNABLE_ACTIONS def run(self, agent_controller): + if not self.is_runnable(): + return None + action = 'output' + try: + output = self._run_and_get_output(agent_controller) + except Exception as e: + output = 'Error: ' + str(e) + action = 'error' + out_event = Event(action, {'output': output}) + return out_event + + def _run_and_get_output(self, agent_controller) -> str: if self.action == 'run': cmd = self.args['command'] background = False diff --git a/opendevin/main.py b/opendevin/main.py index c841b3dab4..2c335ad41c 100644 --- a/opendevin/main.py +++ b/opendevin/main.py @@ -1,4 +1,5 @@ from typing import Type +import asyncio import argparse from opendevin.agent import Agent @@ -16,10 +17,8 @@ if __name__ == "__main__": AgentCls: Type[Agent] = Agent.get_cls(args.agent_cls) agent = AgentCls( - instruction=args.task, workspace_dir=args.directory, model_name=args.model_name ) - controller = AgentController(agent, args.directory) - controller.start_loop() + asyncio.run(controller.start_loop(args.task)) diff --git a/opendevin/server/README.md b/opendevin/server/README.md new file mode 100644 index 0000000000..21e7fe9b74 --- /dev/null +++ b/opendevin/server/README.md @@ -0,0 +1,18 @@ +# OpenDevin server +This is currently just a POC that starts an echo websocket inside docker, and +forwards messages between the client and the docker container. + +## Start the Server +``` +python -m pip install -r requirements.txt +uvicorn opendevin.server.listen:app --reload --port 3000 +``` + +## Test the Server +You can use `websocat` to test the server: https://github.com/vi/websocat + +``` +websocat ws://127.0.0.1:3000/ws +{"action": "start", "args": {"task": "write a bash script that prints hello"}} +``` + diff --git a/opendevin/server/listen.py b/opendevin/server/listen.py new file mode 100644 index 0000000000..5aedac709c --- /dev/null +++ b/opendevin/server/listen.py @@ -0,0 +1,13 @@ +from opendevin.server.session import Session +from fastapi import FastAPI, WebSocket + +app = FastAPI() + +# This endpoint recieves events from the client (i.e. the browser) +@app.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + await websocket.accept() + session = Session(websocket) + # TODO: should this use asyncio instead of await? + await session.start_listening() + diff --git a/opendevin/server/session.py b/opendevin/server/session.py new file mode 100644 index 0000000000..95443c5c2a --- /dev/null +++ b/opendevin/server/session.py @@ -0,0 +1,110 @@ +import os +import asyncio +from typing import Optional + +from fastapi import WebSocketDisconnect + +from opendevin.agent import Agent +from opendevin.controller import AgentController +from opendevin.lib.event import Event + +def parse_event(data): + if "action" not in data: + return None + action = data["action"] + args = {} + if "args" in data: + args = data["args"] + message = None + if "message" in data: + message = data["message"] + return Event(action, args, message) + +class Session: + def __init__(self, websocket): + self.websocket = websocket + self.controller: Optional[AgentController] = None + self.agent: Optional[Agent] = None + self.agent_task = None + asyncio.create_task(self.create_controller(), name="create controller") # FIXME: starting the docker container synchronously causes a websocket error... + + async def send_error(self, message): + await self.send({"error": True, "message": message}) + + async def send_message(self, message): + await self.send({"message": message}) + + async def send(self, data): + if self.websocket is None: + return + try: + await self.websocket.send_json(data) + except Exception as e: + print("Error sending data to client", e) + + async def start_listening(self): + try: + while True: + try: + data = await self.websocket.receive_json() + except ValueError: + await self.send_error("Invalid JSON") + continue + + event = parse_event(data) + if event is None: + await self.send_error("Invalid event") + continue + if event.action == "initialize": + await self.create_controller(event) + elif event.action == "start": + await self.start_task(event) + else: + if self.controller is None: + await self.send_error("No agent started. Please wait a second...") + else: + await self.controller.add_user_event(event) + + except WebSocketDisconnect as e: + self.websocket = None + if self.agent_task: + self.agent_task.cancel() + print("Client websocket disconnected", e) + + async def create_controller(self, start_event=None): + directory = os.getcwd() + if start_event and "directory" in start_event.args: + directory = start_event.args["directory"] + agent_cls = "LangchainsAgent" + if start_event and "agent_cls" in start_event.args: + agent_cls = start_event.args["agent_cls"] + model = "gpt-4-0125-preview" + if start_event and "model" in start_event.args: + model = start_event.args["model"] + + AgentCls = Agent.get_cls(agent_cls) + self.agent = AgentCls( + workspace_dir=directory, + model_name=model, + ) + self.controller = AgentController(self.agent, directory, callbacks=[self.on_agent_event]) + await self.send_message("Control loop started") + + async def start_task(self, start_event): + if "task" not in start_event.args: + await self.send_error("No task specified") + return + await self.send_message("Starting new task...") + task = start_event.args["task"] + if self.controller is None: + await self.send_error("No agent started. Please wait a second...") + return + self.agent_task = asyncio.create_task(self.controller.start_loop(task), name="agent loop") + + def on_agent_event(self, event): + evt = { + "action": event.action, + "message": event.get_message(), + "args": event.args, + } + asyncio.create_task(self.send(evt), name="send event in callback") diff --git a/server/README.md b/server/README.md deleted file mode 100644 index 1b63fc8fd2..0000000000 --- a/server/README.md +++ /dev/null @@ -1,27 +0,0 @@ -# OpenDevin server -This is currently just a POC that starts an echo websocket inside docker, and -forwards messages between the client and the docker container. - -## Start the Server -``` -cd server -python -m pip install -r requirements.txt -uvicorn server:app --reload --port 3000 -``` - -## Test the Server -You can use `websocat` to test the server: https://github.com/vi/websocat - -``` -websocat ws://127.0.0.1:3000/ws -{"source":"client","action":"start"} -``` - -### Test cases -We should be robust to these cases: -* Client connects, sends start command, agent starts up, client disconnects -* Client connects, sends start command, disconnects before agent starts -* Client connects, sends start command, agent disconnects (i.e. docker container is killed) -* Client connects, sends start command, agent starts up, client sends second start command - -In each case, the client should be able to reconnect and send a start command diff --git a/server/server.py b/server/server.py deleted file mode 100644 index b4b91a86ea..0000000000 --- a/server/server.py +++ /dev/null @@ -1,182 +0,0 @@ -import json -import os -from time import sleep - -import docker -import websockets -from fastapi import FastAPI, WebSocket, WebSocketDisconnect -from starlette.websockets import WebSocketState - -app = FastAPI() - -CONTAINER_NAME = "devin-agent" - -AGENT_LISTEN_PORT = 8080 -AGENT_BIND_PORT = os.environ.get("AGENT_PORT", 4522) -MAX_WAIT_TIME_SECONDS = 30 - -agent_listener = None -client_fast_websocket = None -agent_websocket = None - -def get_message_payload(message): - return {"source": "server", "message": message} - -def get_error_payload(message): - payload = get_message_payload(message) - payload["error"] = True - return payload - -# This endpoint recieves events from the client (i.e. the browser) -@app.websocket("/ws") -async def websocket_endpoint(websocket: WebSocket): - global client_fast_websocket - global agent_websocket - - await websocket.accept() - client_fast_websocket = websocket - - try: - while True: - data = await websocket.receive_json() - if "action" not in data: - await send_message_to_client(get_error_payload("No action specified")) - continue - action = data["action"] - if action == "start": - await send_message_to_client(get_message_payload("Starting new agent...")) - directory = os.getcwd() - if "directory" in data: - directory = data["directory"] - try: - await restart_docker_container(directory) - except Exception as e: - print("error while restarting docker container:", e) - await send_message_to_client(get_error_payload("Failed to start container: " + str(e))) - continue - - if action == "terminal": - msg = { - "action": "terminal", - "data": data["data"] - } - await send_message_to_client(get_message_payload(msg)) - else: - if agent_websocket is None: - await send_message_to_client(get_error_payload("Agent not connected")) - continue - - except WebSocketDisconnect: - print("Client websocket disconnected") - await close_all_websockets(get_error_payload("Client disconnected")) - -async def stop_docker_container(): - docker_client = docker.from_env() - try: - container = docker_client.containers.get(CONTAINER_NAME) - container.stop() - container.remove() - elapsed = 0 - while container.status != "exited": - print("waiting for container to stop...") - sleep(1) - elapsed += 1 - if elapsed > MAX_WAIT_TIME_SECONDS: - break - container = docker_client.containers.get(CONTAINER_NAME) - except docker.errors.NotFound: - pass - -async def restart_docker_container(directory): - await stop_docker_container() - docker_client = docker.from_env() - container = docker_client.containers.run( - "jmalloc/echo-server", - name=CONTAINER_NAME, - detach=True, - ports={str(AGENT_LISTEN_PORT) + "/tcp": AGENT_BIND_PORT}, - volumes={directory: {"bind": "/workspace", "mode": "rw"}}) - - # wait for container to be ready - elapsed = 0 - while container.status != "running": - if container.status == "exited": - print("container exited") - print("container logs:") - print(container.logs()) - break - print("waiting for container to start...") - sleep(1) - elapsed += 1 - container = docker_client.containers.get(CONTAINER_NAME) - if elapsed > MAX_WAIT_TIME_SECONDS: - break - if container.status != "running": - raise Exception("Failed to start container") - -async def listen_for_agent_messages(): - global agent_websocket - global client_fast_websocket - - try: - async with websockets.connect("ws://localhost:" + str(AGENT_BIND_PORT)) as ws: - agent_websocket = ws - await send_message_to_client(get_message_payload("Agent connected!")) - await send_message_to_agent({"source": "server", "message": "Hello, agent!"}) - try: - async for message in agent_websocket: - if client_fast_websocket is None: - print("Client websocket not connected") - await close_all_websockets(get_error_payload("Client not connected")) - break - try: - data = json.loads(message) - except Exception as e: - print("error parsing message from agent:", message) - print(e) - continue - if "source" not in data or data["source"] != "agent": - # TODO: remove this once we're not using echo server - print("echo server responded", data) - continue - await send_message_to_agent(data) - except websockets.exceptions.ConnectionClosed: - await send_message_to_client(get_error_payload("Agent disconnected")) - except Exception as e: - print("error connecting to agent:", e) - payload = get_error_payload("Failed to connect to agent: " + str(e)) - await send_message_to_client(payload) - await close_agent_websocket(payload) - -async def send_message_to_client(data): - print("to client:", data) - if client_fast_websocket is None: - return - await client_fast_websocket.send_json(data) - -async def send_message_to_agent(data): - print("to agent:", data) - if agent_websocket is None: - return - await agent_websocket.send(json.dumps(data)) - -async def close_agent_websocket(payload): - global agent_websocket - if agent_websocket is not None: - if not agent_websocket.closed: - await send_message_to_agent(payload) - await agent_websocket.close() - agent_websocket = None - await stop_docker_container() - -async def close_client_websocket(payload): - global client_fast_websocket - if client_fast_websocket is not None: - if client_fast_websocket.client_state != WebSocketState.DISCONNECTED: - await send_message_to_client(payload) - await client_fast_websocket.close() - client_fast_websocket = None - -async def close_all_websockets(payload): - await close_agent_websocket(payload) - await close_client_websocket(payload)