Merge commit '1d052818ae51856c13e6d468ab79673747440ae5' into xw/diff-edit

This commit is contained in:
Xingyao Wang 2024-09-25 15:02:02 +00:00
commit 1e398d6362
29 changed files with 1070 additions and 188 deletions

View File

@ -145,8 +145,7 @@ jobs:
run: make install-python-dependencies
- name: Run runtime tests
run: |
# We install pytest-xdist in order to run tests across CPUs. However, tests start to fail when we run
# then across more than 2 CPUs for some reason
# We install pytest-xdist in order to run tests across CPUs
poetry run pip install pytest-xdist
# Install to be able to retry on failures for flaky tests
@ -158,10 +157,10 @@ jobs:
SKIP_CONTAINER_LOGS=true \
TEST_RUNTIME=eventstream \
SANDBOX_USER_ID=$(id -u) \
SANDBOX_BASE_CONTAINER_IMAGE=$image_name \
SANDBOX_RUNTIME_CONTAINER_IMAGE=$image_name \
TEST_IN_CI=true \
RUN_AS_OPENHANDS=false \
poetry run pytest -n 3 --reruns 1 --reruns-delay 3 --cov=agenthub --cov=openhands --cov-report=xml -s ./tests/runtime
poetry run pytest -n 3 -raR --reruns 1 --reruns-delay 3 --cov=agenthub --cov=openhands --cov-report=xml -s ./tests/runtime
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
env:
@ -207,8 +206,7 @@ jobs:
run: make install-python-dependencies
- name: Run runtime tests
run: |
# We install pytest-xdist in order to run tests across CPUs. However, tests start to fail when we run
# then across more than 2 CPUs for some reason
# We install pytest-xdist in order to run tests across CPUs
poetry run pip install pytest-xdist
# Install to be able to retry on failures for flaky tests
@ -220,10 +218,10 @@ jobs:
SKIP_CONTAINER_LOGS=true \
TEST_RUNTIME=eventstream \
SANDBOX_USER_ID=$(id -u) \
SANDBOX_BASE_CONTAINER_IMAGE=$image_name \
SANDBOX_RUNTIME_CONTAINER_IMAGE=$image_name \
TEST_IN_CI=true \
RUN_AS_OPENHANDS=true \
poetry run pytest -n 3 --reruns 1 --reruns-delay 3 --cov=agenthub --cov=openhands --cov-report=xml -s ./tests/runtime
poetry run pytest -n 3 -raR --reruns 1 --reruns-delay 3 --cov=agenthub --cov=openhands --cov-report=xml -s ./tests/runtime
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
env:
@ -275,7 +273,7 @@ jobs:
TEST_RUNTIME=eventstream \
SANDBOX_USER_ID=$(id -u) \
SANDBOX_BASE_CONTAINER_IMAGE=$image_name \
SANDBOX_RUNTIME_CONTAINER_IMAGE=$image_name \
TEST_IN_CI=true \
TEST_ONLY=true \
./tests/integration/regenerate.sh

1
.gitignore vendored
View File

@ -228,3 +228,4 @@ runtime_*.tar
# docker build
containers/runtime/Dockerfile
containers/runtime/project.tar.gz
containers/runtime/code

View File

@ -1,11 +1,12 @@
# Dynamic constructed Dockerfile
# Dynamically constructed Dockerfile
This folder builds runtime image (sandbox), which will use a `Dockerfile` that is dynamically generated depends on the `base_image` AND a [Python source distribution](https://docs.python.org/3.10/distutils/sourcedist.html) that's based on the current commit of `openhands`.
This folder builds a runtime image (sandbox), which will use a dynamically generated `Dockerfile`
that depends on the `base_image` **AND** a [Python source distribution](https://docs.python.org/3.10/distutils/sourcedist.html) that is based on the current commit of `openhands`.
The following command will generate Dockerfile for `ubuntu:22.04` and the source distribution `.tar` into `containers/runtime`.
The following command will generate a `Dockerfile` file for `nikolaik/python-nodejs:python3.11-nodejs22` (the default base image), an updated `config.sh` and the runtime source distribution files/folders into `containers/runtime`:
```bash
poetry run python3 openhands/runtime/utils/runtime_build.py \
--base_image ubuntu:22.04 \
--base_image nikolaik/python-nodejs:python3.11-nodejs22 \
--build_folder containers/runtime
```

View File

@ -6,7 +6,6 @@ import pathlib
import subprocess
import time
import traceback
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Any, Awaitable, Callable, TextIO
import pandas as pd
@ -302,6 +301,11 @@ def _process_instance_wrapper(
time.sleep(5)
def _process_instance_wrapper_mp(args):
"""Wrapper for multiprocessing, especially for imap_unordered."""
return _process_instance_wrapper(*args)
def run_evaluation(
dataset: pd.DataFrame,
metadata: EvalMetadata | None,
@ -328,20 +332,13 @@ def run_evaluation(
try:
if use_multiprocessing:
with ProcessPoolExecutor(num_workers) as executor:
futures = [
executor.submit(
_process_instance_wrapper,
process_instance_func=process_instance_func,
instance=instance,
metadata=metadata,
use_mp=True,
max_retries=max_retries,
)
with mp.Pool(num_workers) as pool:
args_iter = (
(process_instance_func, instance, metadata, True, max_retries)
for _, instance in dataset.iterrows()
]
for future in as_completed(futures):
result = future.result()
)
results = pool.imap_unordered(_process_instance_wrapper_mp, args_iter)
for result in results:
update_progress(result, pbar, output_fp)
else:
for _, instance in dataset.iterrows():

View File

@ -41,7 +41,7 @@
"@testing-library/jest-dom": "^6.5.0",
"@testing-library/react": "^16.0.1",
"@testing-library/user-event": "^14.5.2",
"@types/node": "^22.5.5",
"@types/node": "^22.6.1",
"@types/react": "^18.3.8",
"@types/react-dom": "^18.3.0",
"@types/react-highlight": "^0.12.8",
@ -4860,9 +4860,9 @@
"integrity": "sha512-nG96G3Wp6acyAgJqGasjODb+acrI7KltPiRxzHPXnP3NgI28bpQDRv53olbqGXbfcgF5aiiHmO3xpwEpS5Ld9g=="
},
"node_modules/@types/node": {
"version": "22.5.5",
"resolved": "https://registry.npmjs.org/@types/node/-/node-22.5.5.tgz",
"integrity": "sha512-Xjs4y5UPO/CLdzpgR6GirZJx36yScjh73+2NlLlkFRSoQN8B0DpfXPdZGnvVmLRLOsqDpOfTNv7D9trgGhmOIA==",
"version": "22.6.1",
"resolved": "https://registry.npmjs.org/@types/node/-/node-22.6.1.tgz",
"integrity": "sha512-V48tCfcKb/e6cVUigLAaJDAILdMP0fUW6BidkPK4GpGjXcfbnoHasCZDwz3N3yVt5we2RHm4XTQCpv0KJz9zqw==",
"devOptional": true,
"dependencies": {
"undici-types": "~6.19.2"

View File

@ -64,7 +64,7 @@
"@testing-library/jest-dom": "^6.5.0",
"@testing-library/react": "^16.0.1",
"@testing-library/user-event": "^14.5.2",
"@types/node": "^22.5.5",
"@types/node": "^22.6.1",
"@types/react": "^18.3.8",
"@types/react-dom": "^18.3.0",
"@types/react-highlight": "^0.12.8",

View File

@ -18,6 +18,7 @@ enum IndicatorColor {
function AgentStatusBar() {
const { t } = useTranslation();
const { curAgentState } = useSelector((state: RootState) => state.agent);
const { curStatusMessage } = useSelector((state: RootState) => state.status);
const AgentStatusMap: {
[k: string]: { message: string; indicator: IndicatorColor };
@ -90,14 +91,25 @@ function AgentStatusBar() {
}
}, [curAgentState]);
const [statusMessage, setStatusMessage] = React.useState<string>("");
React.useEffect(() => {
const trimmedCustomMessage = curStatusMessage.message.trim();
if (trimmedCustomMessage) {
setStatusMessage(t(trimmedCustomMessage));
} else {
setStatusMessage(AgentStatusMap[curAgentState].message);
}
}, [curAgentState, curStatusMessage.message]);
return (
<div className="flex items-center">
<div
className={`w-3 h-3 mr-2 rounded-full animate-pulse ${AgentStatusMap[curAgentState].indicator}`}
/>
<span className="text-sm text-stone-400">
{AgentStatusMap[curAgentState].message}
</span>
<div className="flex flex-col items-center">
<div className="flex items-center">
<div
className={`w-3 h-3 mr-2 rounded-full animate-pulse ${AgentStatusMap[curAgentState].indicator}`}
/>
<span className="text-sm text-stone-400">{statusMessage}</span>
</div>
</div>
);
}

View File

@ -52,13 +52,17 @@ function SettingsForm({
const [enableAdvanced, setEnableAdvanced] =
React.useState(advancedAlreadyInUse);
const handleAdvancedChange = (value: boolean) => {
setEnableAdvanced(value);
};
return (
<>
<Switch
data-testid="advanced-options-toggle"
aria-checked={enableAdvanced}
isSelected={enableAdvanced}
onValueChange={(value) => setEnableAdvanced(value)}
onValueChange={handleAdvancedChange}
>
Advanced Options
</Switch>

File diff suppressed because it is too large Load Diff

View File

@ -6,10 +6,11 @@ import {
ActionSecurityRisk,
appendSecurityAnalyzerInput,
} from "#/state/securityAnalyzerSlice";
import { setCurStatusMessage } from "#/state/statusSlice";
import { setRootTask } from "#/state/taskSlice";
import store from "#/store";
import ActionType from "#/types/ActionType";
import { ActionMessage } from "#/types/Message";
import { ActionMessage, StatusMessage } from "#/types/Message";
import { SocketMessage } from "#/types/ResponseType";
import { handleObservationMessage } from "./observations";
import { getRootTask } from "./taskService";
@ -138,6 +139,16 @@ export function handleActionMessage(message: ActionMessage) {
}
}
export function handleStatusMessage(message: StatusMessage) {
const msg = message.message == null ? "" : message.message.trim();
store.dispatch(
setCurStatusMessage({
...message,
message: msg,
}),
);
}
export function handleAssistantMessage(data: string | SocketMessage) {
let socketMessage: SocketMessage;
@ -149,7 +160,9 @@ export function handleAssistantMessage(data: string | SocketMessage) {
if ("action" in socketMessage) {
handleActionMessage(socketMessage);
} else {
} else if ("observation" in socketMessage) {
handleObservationMessage(socketMessage);
} else if ("message" in socketMessage) {
handleStatusMessage(socketMessage);
}
}

View File

@ -8,11 +8,19 @@ import { I18nKey } from "#/i18n/declaration";
const translate = (key: I18nKey) => i18next.t(key);
// Define a type for the messages
type Message = {
action: ActionType;
args: Record<string, unknown>;
};
class Session {
private static _socket: WebSocket | null = null;
private static _latest_event_id: number = -1;
private static _messageQueue: Message[] = [];
public static _history: Record<string, unknown>[] = [];
// callbacks contain a list of callable functions
@ -83,6 +91,7 @@ class Session {
toast.success("ws", translate(I18nKey.SESSION$SERVER_CONNECTED_MESSAGE));
Session._connecting = false;
Session._initializeAgent();
Session._flushQueue();
Session.callbacks.open?.forEach((callback) => {
callback(e);
});
@ -94,7 +103,6 @@ class Session {
data = JSON.parse(e.data);
Session._history.push(data);
} catch (err) {
// TODO: report the error
toast.error(
"ws",
translate(I18nKey.SESSION$SESSION_HANDLING_ERROR_MESSAGE),
@ -115,6 +123,7 @@ class Session {
};
Session._socket.onerror = () => {
// TODO report error
toast.error(
"ws",
translate(I18nKey.SESSION$SESSION_CONNECTION_ERROR_MESSAGE),
@ -145,9 +154,20 @@ class Session {
Session._socket = null;
}
private static _flushQueue(): void {
while (Session._messageQueue.length > 0) {
const message = Session._messageQueue.shift();
if (message) {
setTimeout(() => Session.send(JSON.stringify(message)), 1000);
}
}
}
static send(message: string): void {
const messageObject: Message = JSON.parse(message);
if (Session._connecting) {
setTimeout(() => Session.send(message), 1000);
Session._messageQueue.push(messageObject);
return;
}
if (!Session.isConnected()) {

View File

@ -87,10 +87,10 @@ export const getSettings = (): Settings => {
export const saveSettings = (settings: Partial<Settings>) => {
Object.keys(settings).forEach((key) => {
const isValid = validKeys.includes(key as keyof Settings);
const value = settings[key as keyof Settings];
if (isValid && typeof value !== "undefined")
localStorage.setItem(key, value.toString());
if (!isValid) return;
let value = settings[key as keyof Settings];
if (value === undefined || value === null) value = "";
localStorage.setItem(key, value.toString());
});
localStorage.setItem("SETTINGS_VERSION", LATEST_SETTINGS_VERSION.toString());
};

View File

@ -0,0 +1,23 @@
import { createSlice, PayloadAction } from "@reduxjs/toolkit";
import { StatusMessage } from "#/types/Message";
const initialStatusMessage: StatusMessage = {
message: "",
is_error: false,
};
export const statusSlice = createSlice({
name: "status",
initialState: {
curStatusMessage: initialStatusMessage,
},
reducers: {
setCurStatusMessage: (state, action: PayloadAction<StatusMessage>) => {
state.curStatusMessage = action.payload;
},
},
});
export const { setCurStatusMessage } = statusSlice.actions;
export default statusSlice.reducer;

View File

@ -8,6 +8,7 @@ import errorsReducer from "./state/errorsSlice";
import taskReducer from "./state/taskSlice";
import jupyterReducer from "./state/jupyterSlice";
import securityAnalyzerReducer from "./state/securityAnalyzerSlice";
import statusReducer from "./state/statusSlice";
export const rootReducer = combineReducers({
browser: browserReducer,
@ -19,6 +20,7 @@ export const rootReducer = combineReducers({
agent: agentReducer,
jupyter: jupyterReducer,
securityAnalyzer: securityAnalyzerReducer,
status: statusReducer,
});
const store = configureStore({

View File

@ -31,3 +31,12 @@ export interface ObservationMessage {
// The timestamp of the message
timestamp: string;
}
export interface StatusMessage {
// TODO not implemented yet
// Whether the status is an error, default is false
is_error: boolean;
// A status message to display to the user
message: string;
}

View File

@ -1,5 +1,5 @@
import { ActionMessage, ObservationMessage } from "./Message";
import { ActionMessage, ObservationMessage, StatusMessage } from "./Message";
type SocketMessage = ActionMessage | ObservationMessage;
type SocketMessage = ActionMessage | ObservationMessage | StatusMessage;
export { type SocketMessage };

View File

@ -54,7 +54,7 @@ class AgentController:
confirmation_mode: bool
agent_to_llm_config: dict[str, LLMConfig]
agent_configs: dict[str, AgentConfig]
agent_task: asyncio.Task | None = None
agent_task: asyncio.Future | None = None
parent: 'AgentController | None' = None
delegate: 'AgentController | None' = None
_pending_action: Action | None = None
@ -115,9 +115,6 @@ class AgentController:
# stuck helper
self._stuck_detector = StuckDetector(self.state)
if not is_delegate:
self.agent_task = asyncio.create_task(self._start_step_loop())
async def close(self):
"""Closes the agent controller, canceling any ongoing tasks and unsubscribing from the event stream."""
if self.agent_task is not None:
@ -149,7 +146,7 @@ class AgentController:
self.state.last_error += f': {exception}'
self.event_stream.add_event(ErrorObservation(message), EventSource.AGENT)
async def _start_step_loop(self):
async def start_step_loop(self):
"""The main loop for the agent's step-by-step execution."""
logger.info(f'[Agent Controller {self.id}] Starting step loop...')

View File

@ -121,6 +121,9 @@ async def main():
event_stream=event_stream,
)
if controller is not None:
controller.agent_task = asyncio.create_task(controller.start_step_loop())
async def prompt_for_next_task():
next_message = input('How can I help? >> ')
if next_message == 'exit':

View File

@ -55,7 +55,6 @@ def create_runtime(
config: The app config.
sid: The session id.
runtime_tools_config: (will be deprecated) The runtime tools config.
"""
# if sid is provided on the command line, use it as the name of the event stream
# otherwise generate it on the basis of the configured jwt_secret
@ -144,6 +143,9 @@ async def run_controller(
headless_mode=headless_mode,
)
if controller is not None:
controller.agent_task = asyncio.create_task(controller.start_step_loop())
assert isinstance(task_str, str), f'task_str must be a string, got {type(task_str)}'
# Logging
logger.info(

View File

@ -16,8 +16,10 @@ from pathlib import Path
import pexpect
from fastapi import FastAPI, HTTPException, Request, UploadFile
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from starlette.exceptions import HTTPException as StarletteHTTPException
from uvicorn import run
from openhands.core.logger import openhands_logger as logger
@ -622,6 +624,35 @@ if __name__ == '__main__':
app = FastAPI(lifespan=lifespan)
# TODO below 3 exception handlers were recommended by Sonnet.
# Are these something we should keep?
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.exception('Unhandled exception occurred:')
return JSONResponse(
status_code=500,
content={
'message': 'An unexpected error occurred. Please try again later.'
},
)
@app.exception_handler(StarletteHTTPException)
async def http_exception_handler(request: Request, exc: StarletteHTTPException):
logger.error(f'HTTP exception occurred: {exc.detail}')
return JSONResponse(
status_code=exc.status_code, content={'message': exc.detail}
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(
request: Request, exc: RequestValidationError
):
logger.error(f'Validation error occurred: {exc}')
return JSONResponse(
status_code=422,
content={'message': 'Invalid request parameters', 'details': exc.errors()},
)
@app.middleware('http')
async def one_request_at_a_time(request: Request, call_next):
assert client is not None

View File

@ -2,6 +2,7 @@ import os
import tempfile
import threading
import uuid
from typing import Callable
from zipfile import ZipFile
import docker
@ -120,6 +121,7 @@ class EventStreamRuntime(Runtime):
sid: str = 'default',
plugins: list[PluginRequirement] | None = None,
env_vars: dict[str, str] | None = None,
status_message_callback: Callable | None = None,
):
self.config = config
self._host_port = 30000 # initial dummy value
@ -131,12 +133,13 @@ class EventStreamRuntime(Runtime):
self.instance_id = (
sid + '_' + str(uuid.uuid4()) if sid is not None else str(uuid.uuid4())
)
self.status_message_callback = status_message_callback
self.send_status_message('STATUS$STARTING_RUNTIME')
self.docker_client: docker.DockerClient = self._init_docker_client()
self.base_container_image = self.config.sandbox.base_container_image
self.runtime_container_image = self.config.sandbox.runtime_container_image
self.container_name = self.container_name_prefix + self.instance_id
self.container = None
self.action_semaphore = threading.Semaphore(1) # Ensure one action at a time
@ -147,9 +150,10 @@ class EventStreamRuntime(Runtime):
self.log_buffer: LogBuffer | None = None
if self.config.sandbox.runtime_extra_deps:
logger.info(
logger.debug(
f'Installing extra user-provided dependencies in the runtime image: {self.config.sandbox.runtime_extra_deps}'
)
self.skip_container_logs = (
os.environ.get('SKIP_CONTAINER_LOGS', 'false').lower() == 'true'
)
@ -158,6 +162,8 @@ class EventStreamRuntime(Runtime):
raise ValueError(
'Neither runtime container image nor base container image is set'
)
logger.info('Preparing container, this might take a few minutes...')
self.send_status_message('STATUS$STARTING_CONTAINER')
self.runtime_container_image = build_runtime_image(
self.base_container_image,
self.runtime_builder,
@ -170,9 +176,13 @@ class EventStreamRuntime(Runtime):
)
# will initialize both the event stream and the env vars
super().__init__(config, event_stream, sid, plugins, env_vars)
super().__init__(
config, event_stream, sid, plugins, env_vars, status_message_callback
)
logger.info('Waiting for client to become ready...')
self.send_status_message('STATUS$WAITING_FOR_CLIENT')
logger.info('Waiting for runtime container to be alive...')
self._wait_until_alive()
self.setup_initial_env()
@ -180,6 +190,7 @@ class EventStreamRuntime(Runtime):
logger.info(
f'Container initialized with plugins: {[plugin.name for plugin in self.plugins]}'
)
self.send_status_message(' ')
@staticmethod
def _init_docker_client() -> docker.DockerClient:
@ -202,9 +213,8 @@ class EventStreamRuntime(Runtime):
plugins: list[PluginRequirement] | None = None,
):
try:
logger.info(
f'Starting container with image: {self.runtime_container_image} and name: {self.container_name}'
)
logger.info('Preparing to start container...')
self.send_status_message('STATUS$PREPARING_CONTAINER')
plugin_arg = ''
if plugins is not None and len(plugins) > 0:
plugin_arg = (
@ -242,17 +252,17 @@ class EventStreamRuntime(Runtime):
if self.config.debug:
environment['DEBUG'] = 'true'
logger.info(f'Workspace Base: {self.config.workspace_base}')
logger.debug(f'Workspace Base: {self.config.workspace_base}')
if mount_dir is not None and sandbox_workspace_dir is not None:
# e.g. result would be: {"/home/user/openhands/workspace": {'bind': "/workspace", 'mode': 'rw'}}
volumes = {mount_dir: {'bind': sandbox_workspace_dir, 'mode': 'rw'}}
logger.info(f'Mount dir: {mount_dir}')
logger.debug(f'Mount dir: {mount_dir}')
else:
logger.warn(
'Warning: Mount dir is not set, will not mount the workspace directory to the container!\n'
)
volumes = None
logger.info(f'Sandbox workspace: {sandbox_workspace_dir}')
logger.debug(f'Sandbox workspace: {sandbox_workspace_dir}')
if self.config.sandbox.browsergym_eval_env is not None:
browsergym_arg = (
@ -260,6 +270,7 @@ class EventStreamRuntime(Runtime):
)
else:
browsergym_arg = ''
container = self.docker_client.containers.run(
self.runtime_container_image,
command=(
@ -282,6 +293,7 @@ class EventStreamRuntime(Runtime):
)
self.log_buffer = LogBuffer(container)
logger.info(f'Container started. Server url: {self.api_url}')
self.send_status_message('STATUS$CONTAINER_STARTED')
return container
except Exception as e:
logger.error(
@ -543,3 +555,8 @@ class EventStreamRuntime(Runtime):
return port
# If no port is found after max_attempts, return the last tried port
return port
def send_status_message(self, message: str):
"""Sends a status message if the callback function was provided."""
if self.status_message_callback:
self.status_message_callback(message)

View File

@ -1,3 +1,5 @@
from typing import Callable, Optional
from openhands.core.config import AppConfig
from openhands.events.action import (
FileReadAction,
@ -25,8 +27,15 @@ class E2BRuntime(Runtime):
sid: str = 'default',
plugins: list[PluginRequirement] | None = None,
sandbox: E2BSandbox | None = None,
status_message_callback: Optional[Callable] = None,
):
super().__init__(config, event_stream, sid, plugins)
super().__init__(
config,
event_stream,
sid,
plugins,
status_message_callback=status_message_callback,
)
if sandbox is None:
self.sandbox = E2BSandbox()
if not isinstance(self.sandbox, E2BSandbox):

View File

@ -2,6 +2,7 @@ import os
import tempfile
import threading
import uuid
from typing import Callable, Optional
from zipfile import ZipFile
import requests
@ -55,6 +56,7 @@ class RemoteRuntime(Runtime):
sid: str = 'default',
plugins: list[PluginRequirement] | None = None,
env_vars: dict[str, str] | None = None,
status_message_callback: Optional[Callable] = None,
):
self.config = config
if self.config.sandbox.api_hostname == 'localhost':
@ -168,7 +170,9 @@ class RemoteRuntime(Runtime):
)
# Initialize the eventstream and env vars
super().__init__(config, event_stream, sid, plugins, env_vars)
super().__init__(
config, event_stream, sid, plugins, env_vars, status_message_callback
)
logger.info(
f'Runtime initialized with plugins: {[plugin.name for plugin in self.plugins]}'

View File

@ -3,6 +3,7 @@ import copy
import json
import os
from abc import abstractmethod
from typing import Callable
from openhands.core.config import AppConfig, SandboxConfig
from openhands.core.logger import openhands_logger as logger
@ -59,11 +60,13 @@ class Runtime:
sid: str = 'default',
plugins: list[PluginRequirement] | None = None,
env_vars: dict[str, str] | None = None,
status_message_callback: Callable | None = None,
):
self.sid = sid
self.event_stream = event_stream
self.event_stream.subscribe(EventStreamSubscriber.RUNTIME, self.on_event)
self.plugins = plugins if plugins is not None and len(plugins) > 0 else []
self.status_message_callback = status_message_callback
self.config = copy.deepcopy(config)
atexit.register(self.close)

View File

@ -1,3 +1,8 @@
import asyncio
from threading import Thread
from typing import Callable, Optional
from openhands.controller import AgentController
from openhands.controller.agent import Agent
from openhands.controller.state.state import State
@ -46,9 +51,9 @@ class AgentSession:
max_budget_per_task: float | None = None,
agent_to_llm_config: dict[str, LLMConfig] | None = None,
agent_configs: dict[str, AgentConfig] | None = None,
status_message_callback: Optional[Callable] = None,
):
"""Starts the Agent session
Parameters:
- runtime_name: The name of the runtime associated with the session
- config:
@ -58,14 +63,18 @@ class AgentSession:
- agent_to_llm_config:
- agent_configs:
"""
if self.controller or self.runtime:
raise RuntimeError(
'Session already started. You need to close this session and start a new one.'
)
await self._create_security_analyzer(config.security.security_analyzer)
await self._create_runtime(runtime_name, config, agent)
await self._create_controller(
self.loop = asyncio.new_event_loop()
self.thread = Thread(target=self._run, daemon=True)
self.thread.start()
self._create_security_analyzer(config.security.security_analyzer)
self._create_runtime(runtime_name, config, agent, status_message_callback)
self._create_controller(
agent,
config.security.confirmation_mode,
max_iterations,
@ -73,6 +82,13 @@ class AgentSession:
agent_to_llm_config=agent_to_llm_config,
agent_configs=agent_configs,
)
if self.controller is not None:
self.controller.agent_task = asyncio.run_coroutine_threadsafe(self.controller.start_step_loop(), self.loop) # type: ignore
def _run(self):
asyncio.set_event_loop(self.loop)
self.loop.run_forever()
async def close(self):
"""Closes the Agent session"""
@ -87,22 +103,32 @@ class AgentSession:
self.runtime.close()
if self.security_analyzer is not None:
await self.security_analyzer.close()
self.loop.call_soon_threadsafe(self.loop.stop)
self.thread.join()
self._closed = True
async def _create_security_analyzer(self, security_analyzer: str | None):
def _create_security_analyzer(self, security_analyzer: str | None):
"""Creates a SecurityAnalyzer instance that will be used to analyze the agent actions
Parameters:
- security_analyzer: The name of the security analyzer to use
"""
logger.info(f'Using security analyzer: {security_analyzer}')
if security_analyzer:
logger.debug(f'Using security analyzer: {security_analyzer}')
self.security_analyzer = options.SecurityAnalyzers.get(
security_analyzer, SecurityAnalyzer
)(self.event_stream)
async def _create_runtime(self, runtime_name: str, config: AppConfig, agent: Agent):
def _create_runtime(
self,
runtime_name: str,
config: AppConfig,
agent: Agent,
status_message_callback: Optional[Callable] = None,
):
"""Creates a runtime instance
Parameters:
@ -112,18 +138,27 @@ class AgentSession:
"""
if self.runtime is not None:
raise Exception('Runtime already created')
raise RuntimeError('Runtime already created')
logger.info(f'Initializing runtime `{runtime_name}` now...')
runtime_cls = get_runtime_cls(runtime_name)
self.runtime = runtime_cls(
config=config,
event_stream=self.event_stream,
sid=self.sid,
plugins=agent.sandbox_plugins,
status_message_callback=status_message_callback,
)
async def _create_controller(
if self.runtime is not None:
logger.debug(
f'Runtime initialized with plugins: {[plugin.name for plugin in self.runtime.plugins]}'
)
else:
logger.warning('Runtime initialization failed')
def _create_controller(
self,
agent: Agent,
confirmation_mode: bool,
@ -178,5 +213,5 @@ class AgentSession:
)
logger.info(f'Restored agent state from session, sid: {self.sid}')
except Exception as e:
logger.info(f'Error restoring state: {e}')
logger.info(f'State could not be restored: {e}')
logger.info('Agent controller initialized.')

View File

@ -35,9 +35,11 @@ class SessionManager:
async def send(self, sid: str, data: dict[str, object]) -> bool:
"""Sends data to the client."""
if sid not in self._sessions:
session = self.get_session(sid)
if session is None:
logger.error(f'*** No session found for {sid}, skipping message ***')
return False
return await self._sessions[sid].send(data)
return await session.send(data)
async def send_error(self, sid: str, message: str) -> bool:
"""Sends an error message to the client."""

View File

@ -21,7 +21,7 @@ from openhands.events.serialization import event_from_dict, event_to_dict
from openhands.events.stream import EventStreamSubscriber
from openhands.llm.llm import LLM
from openhands.runtime.utils.shutdown_listener import should_continue
from openhands.server.session.agent import AgentSession
from openhands.server.session.agent_session import AgentSession
from openhands.storage.files import FileStore
DEL_DELT_SEC = 60 * 60 * 5
@ -33,6 +33,7 @@ class Session:
last_active_ts: int = 0
is_alive: bool = True
agent_session: AgentSession
loop: asyncio.AbstractEventLoop
def __init__(
self, sid: str, ws: WebSocket | None, config: AppConfig, file_store: FileStore
@ -45,6 +46,7 @@ class Session:
EventStreamSubscriber.SERVER, self.on_event
)
self.config = config
self.loop = asyncio.get_event_loop()
async def close(self):
self.is_alive = False
@ -76,9 +78,7 @@ class Session:
AgentStateChangedObservation('', AgentState.LOADING), EventSource.AGENT
)
# Extract the agent-relevant arguments from the request
args = {
key: value for key, value in data.get('args', {}).items() if value != ''
}
args = {key: value for key, value in data.get('args', {}).items()}
agent_cls = args.get(ConfigType.AGENT, self.config.default_agent)
self.config.security.confirmation_mode = args.get(
ConfigType.CONFIRMATION_MODE, self.config.security.confirmation_mode
@ -115,6 +115,7 @@ class Session:
max_budget_per_task=self.config.max_budget_per_task,
agent_to_llm_config=self.config.get_agent_to_llm_config_map(),
agent_configs=self.config.get_agent_configs(),
status_message_callback=self.queue_status_message,
)
except Exception as e:
logger.exception(f'Error creating controller: {e}')
@ -127,7 +128,8 @@ class Session:
)
async def on_event(self, event: Event):
"""Callback function for agent events.
"""Callback function for events that mainly come from the agent.
Event is the base class for any agent action and observation.
Args:
event: The agent event (Observation or Action).
@ -137,7 +139,6 @@ class Session:
if isinstance(event, NullObservation):
return
if event.source == EventSource.AGENT:
logger.info('Server event')
await self.send(event_to_dict(event))
elif event.source == EventSource.USER and isinstance(
event, CmdOutputObservation
@ -174,6 +175,9 @@ class Session:
await asyncio.sleep(0.001) # This flushes the data to the client
self.last_active_ts = int(time.time())
return True
except RuntimeError:
self.is_alive = False
return False
except WebSocketDisconnect:
self.is_alive = False
return False
@ -197,3 +201,8 @@ class Session:
return False
self.is_alive = data.get('is_alive', False)
return True
def queue_status_message(self, message: str):
"""Queues a status message to be sent asynchronously."""
# Ensure the coroutine runs in the main event loop
asyncio.run_coroutine_threadsafe(self.send_message(message), self.loop)

32
poetry.lock generated
View File

@ -571,17 +571,17 @@ files = [
[[package]]
name = "boto3"
version = "1.35.24"
version = "1.35.25"
description = "The AWS SDK for Python"
optional = false
python-versions = ">=3.8"
files = [
{file = "boto3-1.35.24-py3-none-any.whl", hash = "sha256:97fcc1a14cbc759e4ba9535ced703a99fcf652c9c4b8dfcd06f292c80551684b"},
{file = "boto3-1.35.24.tar.gz", hash = "sha256:be7807f30f26d6c0057e45cfd09dad5968e664488bf4f9138d0bb7a0f6d8ed40"},
{file = "boto3-1.35.25-py3-none-any.whl", hash = "sha256:b1cfad301184cdd44dfd4805187ccab12de8dd28dd12a11a5cfdace17918c6de"},
{file = "boto3-1.35.25.tar.gz", hash = "sha256:5df4e2cbe3409db07d3a0d8d63d5220ce3202a78206ad87afdbb41519b26ce45"},
]
[package.dependencies]
botocore = ">=1.35.24,<1.36.0"
botocore = ">=1.35.25,<1.36.0"
jmespath = ">=0.7.1,<2.0.0"
s3transfer = ">=0.10.0,<0.11.0"
@ -590,13 +590,13 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"]
[[package]]
name = "botocore"
version = "1.35.24"
version = "1.35.25"
description = "Low-level, data-driven core of boto 3."
optional = false
python-versions = ">=3.8"
files = [
{file = "botocore-1.35.24-py3-none-any.whl", hash = "sha256:eb9ccc068255cc3d24c36693fda6aec7786db05ae6c2b13bcba66dce6a13e2e3"},
{file = "botocore-1.35.24.tar.gz", hash = "sha256:1e59b0f14f4890c4f70bd6a58a634b9464bed1c4c6171f87c8795d974ade614b"},
{file = "botocore-1.35.25-py3-none-any.whl", hash = "sha256:e58d60260abf10ccc4417967923117c9902a6a0cff9fddb6ea7ff42dc1bd4630"},
{file = "botocore-1.35.25.tar.gz", hash = "sha256:76c5706b2c6533000603ae8683a297c887abbbaf6ee31e1b2e2863b74b2989bc"},
]
[package.dependencies]
@ -3762,13 +3762,13 @@ types-tqdm = "*"
[[package]]
name = "litellm"
version = "1.47.1"
version = "1.48.0"
description = "Library to easily interface with LLM API providers"
optional = false
python-versions = "!=2.7.*,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,!=3.7.*,>=3.8"
files = [
{file = "litellm-1.47.1-py3-none-any.whl", hash = "sha256:baa1961287ee398c937e8a5ecd1fcb821ea8f91cbd1f4757b6c19d7bcc84d4fd"},
{file = "litellm-1.47.1.tar.gz", hash = "sha256:51d1eb353573ddeac75c45b66147f533f64f231540667ea30b63edb9a2af15ce"},
{file = "litellm-1.48.0-py3-none-any.whl", hash = "sha256:7765e8a92069778f5fc66aacfabd0e2f8ec8d74fb117f5e475567d89b0d376b9"},
{file = "litellm-1.48.0.tar.gz", hash = "sha256:31a9b8a25a9daf44c24ddc08bf74298da920f2c5cea44135e5061278d0aa6fc9"},
]
[package.dependencies]
@ -4539,13 +4539,13 @@ files = [
[[package]]
name = "minio"
version = "7.2.8"
version = "7.2.9"
description = "MinIO Python SDK for Amazon S3 Compatible Cloud Storage"
optional = false
python-versions = ">3.8"
files = [
{file = "minio-7.2.8-py3-none-any.whl", hash = "sha256:aa3b485788b63b12406a5798465d12a57e4be2ac2a58a8380959b6b748e64ddd"},
{file = "minio-7.2.8.tar.gz", hash = "sha256:f8af2dafc22ebe1aef3ac181b8e217037011c430aa6da276ed627e55aaf7c815"},
{file = "minio-7.2.9-py3-none-any.whl", hash = "sha256:fe5523d9c4a4d6cfc07e96905852841bccdb22b22770e1efca4bf5ae8b65774b"},
{file = "minio-7.2.9.tar.gz", hash = "sha256:a83c2fcd981944602a8dc11e8e07543ed9cda0a9462264e3f46a13171c56bccb"},
]
[package.dependencies]
@ -5365,13 +5365,13 @@ sympy = "*"
[[package]]
name = "openai"
version = "1.47.0"
version = "1.47.1"
description = "The official Python library for the openai API"
optional = false
python-versions = ">=3.7.1"
files = [
{file = "openai-1.47.0-py3-none-any.whl", hash = "sha256:9ccc8737dfa791f7bd903db4758c176b8544a8cd89d3a3d2add3cea02a34c3a0"},
{file = "openai-1.47.0.tar.gz", hash = "sha256:6e14d6f77c8cf546646afcd87a2ef752505b3710d2564a2e433e17307dfa86a0"},
{file = "openai-1.47.1-py3-none-any.whl", hash = "sha256:34277583bf268bb2494bc03f48ac123788c5e2a914db1d5a23d5edc29d35c825"},
{file = "openai-1.47.1.tar.gz", hash = "sha256:62c8f5f478f82ffafc93b33040f8bb16a45948306198bd0cba2da2ecd9cf7323"},
]
[package.dependencies]

View File

@ -243,6 +243,7 @@ def _load_runtime(
if base_container_image is not None:
config.sandbox.base_container_image = base_container_image
config.sandbox.runtime_container_image = None
file_store = get_file_store(config.file_store, config.file_store_path)
event_stream = EventStream(sid, file_store)