Add system event listeners for monitoring (#6929)

This commit is contained in:
Ray Myers
2025-02-26 19:37:21 -06:00
committed by GitHub
parent 8b234ae57c
commit 34febafae4
10 changed files with 114 additions and 13 deletions

View File

@@ -17,6 +17,7 @@ class ServerConfig(ServerConfigInterface):
'openhands.storage.conversation.file_conversation_store.FileConversationStore'
)
conversation_manager_class: str = 'openhands.server.conversation_manager.standalone_conversation_manager.StandaloneConversationManager'
monitoring_listener_class: str = 'openhands.server.monitoring.MonitoringListener'
def verify_config(self):
if self.config_cls:

View File

@@ -11,6 +11,7 @@ from openhands.core.logger import openhands_logger as logger
from openhands.core.schema.agent import AgentState
from openhands.events.action import MessageAction
from openhands.events.stream import EventStream, session_exists
from openhands.server.monitoring import MonitoringListener
from openhands.server.session.conversation import Conversation
from openhands.server.session.session import ROOM_KEY, Session
from openhands.server.settings import Settings
@@ -31,6 +32,8 @@ class StandaloneConversationManager(ConversationManager):
sio: socketio.AsyncServer
config: AppConfig
file_store: FileStore
# Defaulting monitoring_listener for temp backward compatibility.
monitoring_listener: MonitoringListener = MonitoringListener()
_local_agent_loops_by_sid: dict[str, Session] = field(default_factory=dict)
_local_connection_id_to_session_id: dict[str, str] = field(default_factory=dict)
_active_conversations: dict[str, tuple[Conversation, int]] = field(
@@ -208,6 +211,7 @@ class StandaloneConversationManager(ConversationManager):
config=self.config,
sio=self.sio,
user_id=user_id,
monitoring_listener=self.monitoring_listener,
)
self._local_agent_loops_by_sid[sid] = session
asyncio.create_task(session.initialize_agent(settings, initial_user_msg))
@@ -280,5 +284,8 @@ class StandaloneConversationManager(ConversationManager):
sio: socketio.AsyncServer,
config: AppConfig,
file_store: FileStore,
monitoring_listener: MonitoringListener | None = None,
) -> ConversationManager:
return StandaloneConversationManager(sio, config, file_store)
return StandaloneConversationManager(
sio, config, file_store, monitoring_listener or MonitoringListener()
)

View File

@@ -0,0 +1,38 @@
from openhands.core.config.app_config import AppConfig
from openhands.events.event import Event
class MonitoringListener:
"""
Allow tracking of application activity for monitoring purposes.
Implementations should be non-disruptive, do not raise or block to perform I/O.
"""
def on_session_event(self, event: Event) -> None:
"""
Track metrics about events being added to a Session's EventStream.
"""
pass
def on_agent_session_start(self, success: bool, duration: float) -> None:
"""
Track an agent session start.
Success is true if startup completed without error.
Duration is start time in seconds observed by AgentSession.
"""
pass
def on_create_conversation(self) -> None:
"""
Track the beginning of conversation creation.
Does not currently capture whether it succeed.
"""
pass
@classmethod
def get_instance(
cls,
config: AppConfig,
) -> 'MonitoringListener':
return cls()

View File

@@ -18,6 +18,7 @@ from openhands.server.shared import (
SettingsStoreImpl,
config,
conversation_manager,
monitoring_listener,
)
from openhands.server.types import LLMAuthenticationError, MissingSettingsError
from openhands.storage.data_models.conversation_info import ConversationInfo
@@ -51,6 +52,7 @@ async def _create_new_conversation(
initial_user_msg: str | None,
image_urls: list[str] | None,
):
monitoring_listener.on_create_conversation()
logger.info('Loading settings')
settings_store = await SettingsStoreImpl.get_instance(config, user_id)
settings = await settings_store.load()

View File

@@ -19,6 +19,7 @@ from openhands.runtime import get_runtime_cls
from openhands.runtime.base import Runtime
from openhands.runtime.impl.remote.remote_runtime import RemoteRuntime
from openhands.security import SecurityAnalyzer, options
from openhands.server.monitoring import MonitoringListener
from openhands.storage.files import FileStore
from openhands.utils.async_utils import call_sync_from_async
from openhands.utils.shutdown_listener import should_continue
@@ -44,11 +45,13 @@ class AgentSession:
_started_at: float = 0
_closed: bool = False
loop: asyncio.AbstractEventLoop | None = None
monitoring_listener: MonitoringListener
def __init__(
self,
sid: str,
file_store: FileStore,
monitoring_listener: MonitoringListener,
status_callback: Optional[Callable] = None,
github_user_id: str | None = None,
):
@@ -64,6 +67,7 @@ class AgentSession:
self.file_store = file_store
self._status_callback = status_callback
self.github_user_id = github_user_id
self._monitoring_listener = monitoring_listener
async def start(
self,
@@ -98,10 +102,13 @@ class AgentSession:
logger.warning('Session closed before starting')
return
self._starting = True
self._started_at = time.time()
started_at = time.time()
self._started_at = started_at
finished = False # For monitoring
runtime_connected = False
try:
self._create_security_analyzer(config.security.security_analyzer)
await self._create_runtime(
runtime_connected = await self._create_runtime(
runtime_name=runtime_name,
config=config,
agent=agent,
@@ -134,8 +141,13 @@ class AgentSession:
ChangeAgentStateAction(AgentState.AWAITING_USER_INPUT),
EventSource.ENVIRONMENT,
)
finished = True
finally:
self._starting = False
success = finished and runtime_connected
self._monitoring_listener.on_agent_session_start(
success, (time.time() - started_at)
)
async def close(self):
"""Closes the Agent session"""
@@ -188,13 +200,16 @@ class AgentSession:
github_token: SecretStr | None = None,
selected_repository: str | None = None,
selected_branch: str | None = None,
):
) -> bool:
"""Creates a runtime instance
Parameters:
- runtime_name: The name of the runtime associated with the session
- config:
- agent:
Return True on successfully connected, False if could not connect.
Raises if already created, possibly in other situations.
"""
if self.runtime is not None:
@@ -239,7 +254,7 @@ class AgentSession:
self._status_callback(
'error', 'STATUS$ERROR_RUNTIME_DISCONNECTED', str(e)
)
return
return False
repo_directory = None
if selected_repository:
@@ -264,6 +279,7 @@ class AgentSession:
logger.debug(
f'Runtime initialized with plugins: {[plugin.name for plugin in self.runtime.plugins]}'
)
return True
def _create_controller(
self,

View File

@@ -23,6 +23,7 @@ from openhands.events.observation.error import ErrorObservation
from openhands.events.serialization import event_from_dict, event_to_dict
from openhands.events.stream import EventStreamSubscriber
from openhands.llm.llm import LLM
from openhands.server.monitoring import MonitoringListener
from openhands.server.session.agent_session import AgentSession
from openhands.server.session.conversation_init_data import ConversationInitData
from openhands.server.settings import Settings
@@ -40,6 +41,7 @@ class Session:
loop: asyncio.AbstractEventLoop
config: AppConfig
file_store: FileStore
monitoring_listener: MonitoringListener
user_id: str | None
def __init__(
@@ -47,6 +49,7 @@ class Session:
sid: str,
config: AppConfig,
file_store: FileStore,
monitoring_listener: MonitoringListener,
sio: socketio.AsyncServer | None,
user_id: str | None = None,
):
@@ -59,10 +62,12 @@ class Session:
file_store,
status_callback=self.queue_status_message,
github_user_id=user_id,
monitoring_listener=monitoring_listener,
)
self.agent_session.event_stream.subscribe(
EventStreamSubscriber.SERVER, self.on_event, self.sid
)
self.monitoring_listener = monitoring_listener
# Copying this means that when we update variables they are not applied to the shared global configuration!
self.config = deepcopy(config)
self.loop = asyncio.get_event_loop()
@@ -87,7 +92,6 @@ class Session:
AgentStateChangedObservation('', AgentState.LOADING),
EventSource.ENVIRONMENT,
)
agent_cls = settings.agent or self.config.default_agent
self.config.security.confirmation_mode = (
self.config.security.confirmation_mode
@@ -175,6 +179,7 @@ class Session:
Args:
event: The agent event (Observation or Action).
"""
self.monitoring_listener.on_session_event(event)
if isinstance(event, NullAction):
return
if isinstance(event, NullObservation):

View File

@@ -1,3 +1,4 @@
import inspect
import os
import socketio
@@ -8,6 +9,7 @@ from openhands.server.config.server_config import load_server_config
from openhands.server.conversation_manager.conversation_manager import (
ConversationManager,
)
from openhands.server.monitoring import MonitoringListener
from openhands.storage import get_file_store
from openhands.storage.conversation.conversation_store import ConversationStore
from openhands.storage.settings.settings_store import SettingsStore
@@ -32,11 +34,27 @@ sio = socketio.AsyncServer(
async_mode='asgi', cors_allowed_origins='*', client_manager=client_manager
)
MonitoringListenerImpl = get_impl(
MonitoringListener,
server_config.monitoring_listener_class,
)
monitoring_listener = MonitoringListenerImpl.get_instance(config)
ConversationManagerImpl = get_impl(
ConversationManager, # type: ignore
server_config.conversation_manager_class,
)
conversation_manager = ConversationManagerImpl.get_instance(sio, config, file_store)
if len(inspect.signature(ConversationManagerImpl.get_instance).parameters) == 3:
# This conditional prevents a breaking change in February 2025.
# It should be safe to remove by April.
conversation_manager = ConversationManagerImpl.get_instance(sio, config, file_store)
else:
# This is the new signature.
conversation_manager = ConversationManagerImpl.get_instance( # type: ignore
sio, config, file_store, monitoring_listener
)
SettingsStoreImpl = get_impl(SettingsStore, server_config.settings_store_class) # type: ignore

View File

@@ -10,6 +10,7 @@ from openhands.events import EventStream, EventStreamSubscriber
from openhands.llm import LLM
from openhands.llm.metrics import Metrics
from openhands.runtime.base import Runtime
from openhands.server.monitoring import MonitoringListener
from openhands.server.session.agent_session import AgentSession
from openhands.storage.memory import InMemoryFileStore
@@ -44,7 +45,11 @@ async def test_agent_session_start_with_no_state(mock_agent):
# Setup
file_store = InMemoryFileStore({})
session = AgentSession(sid='test-session', file_store=file_store)
session = AgentSession(
sid='test-session',
file_store=file_store,
monitoring_listener=MonitoringListener(),
)
# Create a mock runtime and set it up
mock_runtime = MagicMock(spec=Runtime)
@@ -52,6 +57,7 @@ async def test_agent_session_start_with_no_state(mock_agent):
# Mock the runtime creation to set up the runtime attribute
async def mock_create_runtime(*args, **kwargs):
session.runtime = mock_runtime
return True
session._create_runtime = AsyncMock(side_effect=mock_create_runtime)
@@ -114,7 +120,11 @@ async def test_agent_session_start_with_restored_state(mock_agent):
# Setup
file_store = InMemoryFileStore({})
session = AgentSession(sid='test-session', file_store=file_store)
session = AgentSession(
sid='test-session',
file_store=file_store,
monitoring_listener=MonitoringListener(),
)
# Create a mock runtime and set it up
mock_runtime = MagicMock(spec=Runtime)
@@ -122,6 +132,7 @@ async def test_agent_session_start_with_restored_state(mock_agent):
# Mock the runtime creation to set up the runtime attribute
async def mock_create_runtime(*args, **kwargs):
session.runtime = mock_runtime
return True
session._create_runtime = AsyncMock(side_effect=mock_create_runtime)

View File

@@ -7,6 +7,7 @@ from litellm.exceptions import (
from openhands.core.config.app_config import AppConfig
from openhands.core.config.llm_config import LLMConfig
from openhands.server.monitoring import MonitoringListener
from openhands.server.session.session import Session
from openhands.storage.memory import InMemoryFileStore
@@ -45,6 +46,7 @@ async def test_notify_on_llm_retry(
config=config,
sio=mock_sio,
user_id='..uid..',
monitoring_listener=MonitoringListener(),
)
session.queue_status_message = AsyncMock()

View File

@@ -9,6 +9,7 @@ from openhands.core.config.app_config import AppConfig
from openhands.server.conversation_manager.standalone_conversation_manager import (
StandaloneConversationManager,
)
from openhands.server.monitoring import MonitoringListener
from openhands.server.session.conversation_init_data import ConversationInitData
from openhands.storage.memory import InMemoryFileStore
@@ -54,7 +55,7 @@ async def test_init_new_local_session():
),
):
async with StandaloneConversationManager(
sio, AppConfig(), InMemoryFileStore()
sio, AppConfig(), InMemoryFileStore(), MonitoringListener()
) as conversation_manager:
await conversation_manager.maybe_start_agent_loop(
'new-session-id', ConversationInitData(), 1
@@ -86,7 +87,7 @@ async def test_join_local_session():
),
):
async with StandaloneConversationManager(
sio, AppConfig(), InMemoryFileStore()
sio, AppConfig(), InMemoryFileStore(), MonitoringListener()
) as conversation_manager:
await conversation_manager.maybe_start_agent_loop(
'new-session-id', ConversationInitData(), None
@@ -121,7 +122,7 @@ async def test_add_to_local_event_stream():
),
):
async with StandaloneConversationManager(
sio, AppConfig(), InMemoryFileStore()
sio, AppConfig(), InMemoryFileStore(), MonitoringListener()
) as conversation_manager:
await conversation_manager.maybe_start_agent_loop(
'new-session-id', ConversationInitData(), 1
@@ -139,7 +140,7 @@ async def test_add_to_local_event_stream():
async def test_cleanup_session_connections():
sio = get_mock_sio()
async with StandaloneConversationManager(
sio, AppConfig(), InMemoryFileStore()
sio, AppConfig(), InMemoryFileStore(), MonitoringListener()
) as conversation_manager:
conversation_manager._local_connection_id_to_session_id.update(
{