diff --git a/openhands/server/config/server_config.py b/openhands/server/config/server_config.py index ae86d8d43a..63bd7a4f28 100644 --- a/openhands/server/config/server_config.py +++ b/openhands/server/config/server_config.py @@ -17,6 +17,7 @@ class ServerConfig(ServerConfigInterface): 'openhands.storage.conversation.file_conversation_store.FileConversationStore' ) conversation_manager_class: str = 'openhands.server.conversation_manager.standalone_conversation_manager.StandaloneConversationManager' + monitoring_listener_class: str = 'openhands.server.monitoring.MonitoringListener' def verify_config(self): if self.config_cls: diff --git a/openhands/server/conversation_manager/standalone_conversation_manager.py b/openhands/server/conversation_manager/standalone_conversation_manager.py index 2f49de63dc..7440342f36 100644 --- a/openhands/server/conversation_manager/standalone_conversation_manager.py +++ b/openhands/server/conversation_manager/standalone_conversation_manager.py @@ -11,6 +11,7 @@ from openhands.core.logger import openhands_logger as logger from openhands.core.schema.agent import AgentState from openhands.events.action import MessageAction from openhands.events.stream import EventStream, session_exists +from openhands.server.monitoring import MonitoringListener from openhands.server.session.conversation import Conversation from openhands.server.session.session import ROOM_KEY, Session from openhands.server.settings import Settings @@ -31,6 +32,8 @@ class StandaloneConversationManager(ConversationManager): sio: socketio.AsyncServer config: AppConfig file_store: FileStore + # Defaulting monitoring_listener for temp backward compatibility. + monitoring_listener: MonitoringListener = MonitoringListener() _local_agent_loops_by_sid: dict[str, Session] = field(default_factory=dict) _local_connection_id_to_session_id: dict[str, str] = field(default_factory=dict) _active_conversations: dict[str, tuple[Conversation, int]] = field( @@ -208,6 +211,7 @@ class StandaloneConversationManager(ConversationManager): config=self.config, sio=self.sio, user_id=user_id, + monitoring_listener=self.monitoring_listener, ) self._local_agent_loops_by_sid[sid] = session asyncio.create_task(session.initialize_agent(settings, initial_user_msg)) @@ -280,5 +284,8 @@ class StandaloneConversationManager(ConversationManager): sio: socketio.AsyncServer, config: AppConfig, file_store: FileStore, + monitoring_listener: MonitoringListener | None = None, ) -> ConversationManager: - return StandaloneConversationManager(sio, config, file_store) + return StandaloneConversationManager( + sio, config, file_store, monitoring_listener or MonitoringListener() + ) diff --git a/openhands/server/monitoring.py b/openhands/server/monitoring.py new file mode 100644 index 0000000000..e5c4462880 --- /dev/null +++ b/openhands/server/monitoring.py @@ -0,0 +1,38 @@ +from openhands.core.config.app_config import AppConfig +from openhands.events.event import Event + + +class MonitoringListener: + """ + Allow tracking of application activity for monitoring purposes. + + Implementations should be non-disruptive, do not raise or block to perform I/O. + """ + + def on_session_event(self, event: Event) -> None: + """ + Track metrics about events being added to a Session's EventStream. + """ + pass + + def on_agent_session_start(self, success: bool, duration: float) -> None: + """ + Track an agent session start. + Success is true if startup completed without error. + Duration is start time in seconds observed by AgentSession. + """ + pass + + def on_create_conversation(self) -> None: + """ + Track the beginning of conversation creation. + Does not currently capture whether it succeed. + """ + pass + + @classmethod + def get_instance( + cls, + config: AppConfig, + ) -> 'MonitoringListener': + return cls() diff --git a/openhands/server/routes/manage_conversations.py b/openhands/server/routes/manage_conversations.py index 4ef130e49e..88efc03cbf 100644 --- a/openhands/server/routes/manage_conversations.py +++ b/openhands/server/routes/manage_conversations.py @@ -18,6 +18,7 @@ from openhands.server.shared import ( SettingsStoreImpl, config, conversation_manager, + monitoring_listener, ) from openhands.server.types import LLMAuthenticationError, MissingSettingsError from openhands.storage.data_models.conversation_info import ConversationInfo @@ -51,6 +52,7 @@ async def _create_new_conversation( initial_user_msg: str | None, image_urls: list[str] | None, ): + monitoring_listener.on_create_conversation() logger.info('Loading settings') settings_store = await SettingsStoreImpl.get_instance(config, user_id) settings = await settings_store.load() diff --git a/openhands/server/session/agent_session.py b/openhands/server/session/agent_session.py index 3da21eb0d2..2f71f8e62e 100644 --- a/openhands/server/session/agent_session.py +++ b/openhands/server/session/agent_session.py @@ -19,6 +19,7 @@ from openhands.runtime import get_runtime_cls from openhands.runtime.base import Runtime from openhands.runtime.impl.remote.remote_runtime import RemoteRuntime from openhands.security import SecurityAnalyzer, options +from openhands.server.monitoring import MonitoringListener from openhands.storage.files import FileStore from openhands.utils.async_utils import call_sync_from_async from openhands.utils.shutdown_listener import should_continue @@ -44,11 +45,13 @@ class AgentSession: _started_at: float = 0 _closed: bool = False loop: asyncio.AbstractEventLoop | None = None + monitoring_listener: MonitoringListener def __init__( self, sid: str, file_store: FileStore, + monitoring_listener: MonitoringListener, status_callback: Optional[Callable] = None, github_user_id: str | None = None, ): @@ -64,6 +67,7 @@ class AgentSession: self.file_store = file_store self._status_callback = status_callback self.github_user_id = github_user_id + self._monitoring_listener = monitoring_listener async def start( self, @@ -98,10 +102,13 @@ class AgentSession: logger.warning('Session closed before starting') return self._starting = True - self._started_at = time.time() + started_at = time.time() + self._started_at = started_at + finished = False # For monitoring + runtime_connected = False try: self._create_security_analyzer(config.security.security_analyzer) - await self._create_runtime( + runtime_connected = await self._create_runtime( runtime_name=runtime_name, config=config, agent=agent, @@ -134,8 +141,13 @@ class AgentSession: ChangeAgentStateAction(AgentState.AWAITING_USER_INPUT), EventSource.ENVIRONMENT, ) + finished = True finally: self._starting = False + success = finished and runtime_connected + self._monitoring_listener.on_agent_session_start( + success, (time.time() - started_at) + ) async def close(self): """Closes the Agent session""" @@ -188,13 +200,16 @@ class AgentSession: github_token: SecretStr | None = None, selected_repository: str | None = None, selected_branch: str | None = None, - ): + ) -> bool: """Creates a runtime instance Parameters: - runtime_name: The name of the runtime associated with the session - config: - agent: + + Return True on successfully connected, False if could not connect. + Raises if already created, possibly in other situations. """ if self.runtime is not None: @@ -239,7 +254,7 @@ class AgentSession: self._status_callback( 'error', 'STATUS$ERROR_RUNTIME_DISCONNECTED', str(e) ) - return + return False repo_directory = None if selected_repository: @@ -264,6 +279,7 @@ class AgentSession: logger.debug( f'Runtime initialized with plugins: {[plugin.name for plugin in self.runtime.plugins]}' ) + return True def _create_controller( self, diff --git a/openhands/server/session/session.py b/openhands/server/session/session.py index d7807fc947..1fa99f59f7 100644 --- a/openhands/server/session/session.py +++ b/openhands/server/session/session.py @@ -23,6 +23,7 @@ from openhands.events.observation.error import ErrorObservation from openhands.events.serialization import event_from_dict, event_to_dict from openhands.events.stream import EventStreamSubscriber from openhands.llm.llm import LLM +from openhands.server.monitoring import MonitoringListener from openhands.server.session.agent_session import AgentSession from openhands.server.session.conversation_init_data import ConversationInitData from openhands.server.settings import Settings @@ -40,6 +41,7 @@ class Session: loop: asyncio.AbstractEventLoop config: AppConfig file_store: FileStore + monitoring_listener: MonitoringListener user_id: str | None def __init__( @@ -47,6 +49,7 @@ class Session: sid: str, config: AppConfig, file_store: FileStore, + monitoring_listener: MonitoringListener, sio: socketio.AsyncServer | None, user_id: str | None = None, ): @@ -59,10 +62,12 @@ class Session: file_store, status_callback=self.queue_status_message, github_user_id=user_id, + monitoring_listener=monitoring_listener, ) self.agent_session.event_stream.subscribe( EventStreamSubscriber.SERVER, self.on_event, self.sid ) + self.monitoring_listener = monitoring_listener # Copying this means that when we update variables they are not applied to the shared global configuration! self.config = deepcopy(config) self.loop = asyncio.get_event_loop() @@ -87,7 +92,6 @@ class Session: AgentStateChangedObservation('', AgentState.LOADING), EventSource.ENVIRONMENT, ) - agent_cls = settings.agent or self.config.default_agent self.config.security.confirmation_mode = ( self.config.security.confirmation_mode @@ -175,6 +179,7 @@ class Session: Args: event: The agent event (Observation or Action). """ + self.monitoring_listener.on_session_event(event) if isinstance(event, NullAction): return if isinstance(event, NullObservation): diff --git a/openhands/server/shared.py b/openhands/server/shared.py index 269f42ffd5..2a472e4121 100644 --- a/openhands/server/shared.py +++ b/openhands/server/shared.py @@ -1,3 +1,4 @@ +import inspect import os import socketio @@ -8,6 +9,7 @@ from openhands.server.config.server_config import load_server_config from openhands.server.conversation_manager.conversation_manager import ( ConversationManager, ) +from openhands.server.monitoring import MonitoringListener from openhands.storage import get_file_store from openhands.storage.conversation.conversation_store import ConversationStore from openhands.storage.settings.settings_store import SettingsStore @@ -32,11 +34,27 @@ sio = socketio.AsyncServer( async_mode='asgi', cors_allowed_origins='*', client_manager=client_manager ) +MonitoringListenerImpl = get_impl( + MonitoringListener, + server_config.monitoring_listener_class, +) + +monitoring_listener = MonitoringListenerImpl.get_instance(config) + ConversationManagerImpl = get_impl( ConversationManager, # type: ignore server_config.conversation_manager_class, ) -conversation_manager = ConversationManagerImpl.get_instance(sio, config, file_store) + +if len(inspect.signature(ConversationManagerImpl.get_instance).parameters) == 3: + # This conditional prevents a breaking change in February 2025. + # It should be safe to remove by April. + conversation_manager = ConversationManagerImpl.get_instance(sio, config, file_store) +else: + # This is the new signature. + conversation_manager = ConversationManagerImpl.get_instance( # type: ignore + sio, config, file_store, monitoring_listener + ) SettingsStoreImpl = get_impl(SettingsStore, server_config.settings_store_class) # type: ignore diff --git a/tests/unit/test_agent_session.py b/tests/unit/test_agent_session.py index fdbe3d0a02..a9a15cd083 100644 --- a/tests/unit/test_agent_session.py +++ b/tests/unit/test_agent_session.py @@ -10,6 +10,7 @@ from openhands.events import EventStream, EventStreamSubscriber from openhands.llm import LLM from openhands.llm.metrics import Metrics from openhands.runtime.base import Runtime +from openhands.server.monitoring import MonitoringListener from openhands.server.session.agent_session import AgentSession from openhands.storage.memory import InMemoryFileStore @@ -44,7 +45,11 @@ async def test_agent_session_start_with_no_state(mock_agent): # Setup file_store = InMemoryFileStore({}) - session = AgentSession(sid='test-session', file_store=file_store) + session = AgentSession( + sid='test-session', + file_store=file_store, + monitoring_listener=MonitoringListener(), + ) # Create a mock runtime and set it up mock_runtime = MagicMock(spec=Runtime) @@ -52,6 +57,7 @@ async def test_agent_session_start_with_no_state(mock_agent): # Mock the runtime creation to set up the runtime attribute async def mock_create_runtime(*args, **kwargs): session.runtime = mock_runtime + return True session._create_runtime = AsyncMock(side_effect=mock_create_runtime) @@ -114,7 +120,11 @@ async def test_agent_session_start_with_restored_state(mock_agent): # Setup file_store = InMemoryFileStore({}) - session = AgentSession(sid='test-session', file_store=file_store) + session = AgentSession( + sid='test-session', + file_store=file_store, + monitoring_listener=MonitoringListener(), + ) # Create a mock runtime and set it up mock_runtime = MagicMock(spec=Runtime) @@ -122,6 +132,7 @@ async def test_agent_session_start_with_restored_state(mock_agent): # Mock the runtime creation to set up the runtime attribute async def mock_create_runtime(*args, **kwargs): session.runtime = mock_runtime + return True session._create_runtime = AsyncMock(side_effect=mock_create_runtime) diff --git a/tests/unit/test_session.py b/tests/unit/test_session.py index 7f61e66f01..d2709e9242 100644 --- a/tests/unit/test_session.py +++ b/tests/unit/test_session.py @@ -7,6 +7,7 @@ from litellm.exceptions import ( from openhands.core.config.app_config import AppConfig from openhands.core.config.llm_config import LLMConfig +from openhands.server.monitoring import MonitoringListener from openhands.server.session.session import Session from openhands.storage.memory import InMemoryFileStore @@ -45,6 +46,7 @@ async def test_notify_on_llm_retry( config=config, sio=mock_sio, user_id='..uid..', + monitoring_listener=MonitoringListener(), ) session.queue_status_message = AsyncMock() diff --git a/tests/unit/test_standalone_conversation_manager.py b/tests/unit/test_standalone_conversation_manager.py index 61ea379831..2e49768aed 100644 --- a/tests/unit/test_standalone_conversation_manager.py +++ b/tests/unit/test_standalone_conversation_manager.py @@ -9,6 +9,7 @@ from openhands.core.config.app_config import AppConfig from openhands.server.conversation_manager.standalone_conversation_manager import ( StandaloneConversationManager, ) +from openhands.server.monitoring import MonitoringListener from openhands.server.session.conversation_init_data import ConversationInitData from openhands.storage.memory import InMemoryFileStore @@ -54,7 +55,7 @@ async def test_init_new_local_session(): ), ): async with StandaloneConversationManager( - sio, AppConfig(), InMemoryFileStore() + sio, AppConfig(), InMemoryFileStore(), MonitoringListener() ) as conversation_manager: await conversation_manager.maybe_start_agent_loop( 'new-session-id', ConversationInitData(), 1 @@ -86,7 +87,7 @@ async def test_join_local_session(): ), ): async with StandaloneConversationManager( - sio, AppConfig(), InMemoryFileStore() + sio, AppConfig(), InMemoryFileStore(), MonitoringListener() ) as conversation_manager: await conversation_manager.maybe_start_agent_loop( 'new-session-id', ConversationInitData(), None @@ -121,7 +122,7 @@ async def test_add_to_local_event_stream(): ), ): async with StandaloneConversationManager( - sio, AppConfig(), InMemoryFileStore() + sio, AppConfig(), InMemoryFileStore(), MonitoringListener() ) as conversation_manager: await conversation_manager.maybe_start_agent_loop( 'new-session-id', ConversationInitData(), 1 @@ -139,7 +140,7 @@ async def test_add_to_local_event_stream(): async def test_cleanup_session_connections(): sio = get_mock_sio() async with StandaloneConversationManager( - sio, AppConfig(), InMemoryFileStore() + sio, AppConfig(), InMemoryFileStore(), MonitoringListener() ) as conversation_manager: conversation_manager._local_connection_id_to_session_id.update( {