mirror of
https://github.com/OpenHands/OpenHands.git
synced 2025-12-26 05:48:36 +08:00
handle exceptions more explicitly (#4971)
This commit is contained in:
parent
e052c25572
commit
c9ed9b166b
@ -18,6 +18,7 @@ from openhands.core.exceptions import (
|
||||
LLMNoActionError,
|
||||
LLMResponseError,
|
||||
)
|
||||
from openhands.core.logger import LOG_ALL_EVENTS
|
||||
from openhands.core.logger import openhands_logger as logger
|
||||
from openhands.core.schema import AgentState
|
||||
from openhands.events import EventSource, EventStream, EventStreamSubscriber
|
||||
@ -528,8 +529,7 @@ class AgentController:
|
||||
|
||||
await self.update_state_after_step()
|
||||
|
||||
# Use info level if LOG_ALL_EVENTS is set
|
||||
log_level = 'info' if os.getenv('LOG_ALL_EVENTS') in ('true', '1') else 'debug'
|
||||
log_level = 'info' if LOG_ALL_EVENTS else 'debug'
|
||||
self.log(log_level, str(action), extra={'msg_type': 'ACTION'})
|
||||
|
||||
async def _delegate_step(self):
|
||||
|
||||
@ -17,6 +17,8 @@ if DEBUG:
|
||||
LOG_TO_FILE = os.getenv('LOG_TO_FILE', 'False').lower() in ['true', '1', 'yes']
|
||||
DISABLE_COLOR_PRINTING = False
|
||||
|
||||
LOG_ALL_EVENTS = os.getenv('LOG_ALL_EVENTS', 'False').lower() in ['true', '1', 'yes']
|
||||
|
||||
ColorType = Literal[
|
||||
'red',
|
||||
'green',
|
||||
@ -89,8 +91,11 @@ class ColoredFormatter(logging.Formatter):
|
||||
return f'{time_str} - {name_str}:{level_str}: {record.filename}:{record.lineno}\n{msg_type_color}\n{msg}'
|
||||
return f'{time_str} - {msg_type_color}\n{msg}'
|
||||
elif msg_type == 'STEP':
|
||||
msg = '\n\n==============\n' + record.msg + '\n'
|
||||
return f'{msg}'
|
||||
if LOG_ALL_EVENTS:
|
||||
msg = '\n\n==============\n' + record.msg + '\n'
|
||||
return f'{msg}'
|
||||
else:
|
||||
return record.msg
|
||||
return super().format(record)
|
||||
|
||||
|
||||
|
||||
@ -47,11 +47,19 @@ STATUS_MESSAGES = {
|
||||
}
|
||||
|
||||
|
||||
class RuntimeNotReadyError(Exception):
|
||||
class RuntimeUnavailableError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class RuntimeDisconnectedError(Exception):
|
||||
class RuntimeNotReadyError(RuntimeUnavailableError):
|
||||
pass
|
||||
|
||||
|
||||
class RuntimeDisconnectedError(RuntimeUnavailableError):
|
||||
pass
|
||||
|
||||
|
||||
class RuntimeNotFoundError(RuntimeUnavailableError):
|
||||
pass
|
||||
|
||||
|
||||
|
||||
@ -34,7 +34,11 @@ from openhands.events.observation import (
|
||||
)
|
||||
from openhands.events.serialization import event_to_dict, observation_from_dict
|
||||
from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
|
||||
from openhands.runtime.base import Runtime
|
||||
from openhands.runtime.base import (
|
||||
Runtime,
|
||||
RuntimeDisconnectedError,
|
||||
RuntimeNotFoundError,
|
||||
)
|
||||
from openhands.runtime.builder import DockerRuntimeBuilder
|
||||
from openhands.runtime.impl.eventstream.containers import remove_all_containers
|
||||
from openhands.runtime.plugins import PluginRequirement
|
||||
@ -424,10 +428,22 @@ class EventStreamRuntime(Runtime):
|
||||
|
||||
@tenacity.retry(
|
||||
stop=tenacity.stop_after_delay(120) | stop_if_should_exit(),
|
||||
reraise=(ConnectionRefusedError,),
|
||||
retry=tenacity.retry_if_exception_type(
|
||||
(ConnectionError, requests.exceptions.ConnectionError)
|
||||
),
|
||||
reraise=True,
|
||||
wait=tenacity.wait_fixed(2),
|
||||
)
|
||||
def _wait_until_alive(self):
|
||||
try:
|
||||
container = self.docker_client.containers.get(self.container_name)
|
||||
if container.status == 'exited':
|
||||
raise RuntimeDisconnectedError(
|
||||
f'Container {self.container_name} has exited.'
|
||||
)
|
||||
except docker.errors.NotFound:
|
||||
raise RuntimeNotFoundError(f'Container {self.container_name} not found.')
|
||||
|
||||
self._refresh_logs()
|
||||
if not self.log_buffer:
|
||||
raise RuntimeError('Runtime client is not ready.')
|
||||
|
||||
@ -31,6 +31,7 @@ from openhands.events.serialization.action import ACTION_TYPE_TO_CLASS
|
||||
from openhands.runtime.base import (
|
||||
Runtime,
|
||||
RuntimeDisconnectedError,
|
||||
RuntimeNotFoundError,
|
||||
RuntimeNotReadyError,
|
||||
)
|
||||
from openhands.runtime.builder.remote import RemoteRuntimeBuilder
|
||||
@ -109,7 +110,9 @@ class RemoteRuntime(Runtime):
|
||||
if existing_runtime:
|
||||
self.log('debug', f'Using existing runtime with ID: {self.runtime_id}')
|
||||
elif self.attach_to_existing:
|
||||
raise RuntimeError('Could not find existing runtime to attach to.')
|
||||
raise RuntimeNotFoundError(
|
||||
f'Could not find existing runtime for SID: {self.sid}'
|
||||
)
|
||||
else:
|
||||
self.send_status_message('STATUS$STARTING_CONTAINER')
|
||||
if self.config.sandbox.runtime_container_image is None:
|
||||
|
||||
@ -34,6 +34,7 @@ from fastapi import (
|
||||
Request,
|
||||
UploadFile,
|
||||
WebSocket,
|
||||
WebSocketDisconnect,
|
||||
status,
|
||||
)
|
||||
from fastapi.responses import FileResponse, JSONResponse
|
||||
@ -238,7 +239,8 @@ async def attach_session(request: Request, call_next):
|
||||
request.state.conversation = await session_manager.attach_to_conversation(
|
||||
request.state.sid
|
||||
)
|
||||
if request.state.conversation is None:
|
||||
if not request.state.conversation:
|
||||
logger.error(f'Runtime not found for session: {request.state.sid}')
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
content={'error': 'Session not found'},
|
||||
@ -344,7 +346,13 @@ async def websocket_endpoint(websocket: WebSocket):
|
||||
|
||||
latest_event_id = -1
|
||||
if websocket.query_params.get('latest_event_id'):
|
||||
latest_event_id = int(websocket.query_params.get('latest_event_id'))
|
||||
try:
|
||||
latest_event_id = int(websocket.query_params.get('latest_event_id'))
|
||||
except ValueError:
|
||||
logger.warning(
|
||||
f'Invalid latest_event_id: {websocket.query_params.get("latest_event_id")}'
|
||||
)
|
||||
pass
|
||||
|
||||
async_stream = AsyncEventStreamWrapper(
|
||||
session.agent_session.event_stream, latest_event_id + 1
|
||||
@ -361,7 +369,14 @@ async def websocket_endpoint(websocket: WebSocket):
|
||||
),
|
||||
):
|
||||
continue
|
||||
await websocket.send_json(event_to_dict(event))
|
||||
try:
|
||||
await websocket.send_json(event_to_dict(event))
|
||||
except WebSocketDisconnect:
|
||||
logger.warning(
|
||||
'Websocket disconnected while sending event history, before loop started'
|
||||
)
|
||||
session.close()
|
||||
return
|
||||
|
||||
await session.loop_recv()
|
||||
|
||||
|
||||
@ -11,7 +11,7 @@ from openhands.events.action.agent import ChangeAgentStateAction
|
||||
from openhands.events.event import EventSource
|
||||
from openhands.events.stream import EventStream
|
||||
from openhands.runtime import get_runtime_cls
|
||||
from openhands.runtime.base import Runtime
|
||||
from openhands.runtime.base import Runtime, RuntimeUnavailableError
|
||||
from openhands.security import SecurityAnalyzer, options
|
||||
from openhands.storage.files import FileStore
|
||||
|
||||
@ -194,13 +194,13 @@ class AgentSession:
|
||||
|
||||
try:
|
||||
await self.runtime.connect()
|
||||
except Exception as e:
|
||||
except RuntimeUnavailableError as e:
|
||||
logger.error(f'Runtime initialization failed: {e}', exc_info=True)
|
||||
if self._status_callback:
|
||||
self._status_callback(
|
||||
'error', 'STATUS$ERROR_RUNTIME_DISCONNECTED', str(e)
|
||||
)
|
||||
raise
|
||||
return
|
||||
|
||||
if self.runtime is not None:
|
||||
logger.debug(
|
||||
|
||||
@ -6,6 +6,7 @@ from fastapi import WebSocket
|
||||
from openhands.core.config import AppConfig
|
||||
from openhands.core.logger import openhands_logger as logger
|
||||
from openhands.events.stream import session_exists
|
||||
from openhands.runtime.base import RuntimeUnavailableError
|
||||
from openhands.server.session.conversation import Conversation
|
||||
from openhands.server.session.session import Session
|
||||
from openhands.storage.files import FileStore
|
||||
@ -26,7 +27,11 @@ class SessionManager:
|
||||
if not await session_exists(sid, self.file_store):
|
||||
return None
|
||||
c = Conversation(sid, file_store=self.file_store, config=self.config)
|
||||
await c.connect()
|
||||
try:
|
||||
await c.connect()
|
||||
except RuntimeUnavailableError as e:
|
||||
logger.error(f'Error connecting to conversation {c.sid}: {e}')
|
||||
return None
|
||||
end_time = time.time()
|
||||
logger.info(
|
||||
f'Conversation {c.sid} connected in {end_time - start_time} seconds'
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user