Include metadata like session_id in logs (#7145)

This commit is contained in:
Ray Myers 2025-03-07 17:28:51 -06:00 committed by GitHub
parent 83851c398d
commit dc9489ddcd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 149 additions and 64 deletions

View File

@ -192,10 +192,13 @@ class AgentController:
Args:
level (str): The logging level to use (e.g., 'info', 'debug', 'error').
message (str): The message to log.
extra (dict | None, optional): Additional fields to include in the log. Defaults to None.
extra (dict | None, optional): Additional fields to log. Includes session_id by default.
"""
message = f'[Agent Controller {self.id}] {message}'
getattr(logger, level)(message, extra=extra, stacklevel=2)
if extra is None:
extra = {}
extra_merged = {'session_id': self.id, **extra}
getattr(logger, level)(message, extra=extra_merged, stacklevel=2)
def update_state_before_step(self):
self.state.iteration += 1

View File

@ -279,15 +279,11 @@ class SensitiveDataFilter(logging.Filter):
return True
def get_console_handler(
log_level: int = logging.INFO, extra_info: str | None = None
) -> logging.StreamHandler:
def get_console_handler(log_level: int = logging.INFO) -> logging.StreamHandler:
"""Returns a console handler for logging."""
console_handler = logging.StreamHandler()
console_handler.setLevel(log_level)
formatter_str = '\033[92m%(asctime)s - %(name)s:%(levelname)s\033[0m: %(filename)s:%(lineno)s - %(message)s'
if extra_info:
formatter_str = f'{extra_info} - ' + formatter_str
console_handler.setFormatter(ColoredFormatter(formatter_str, datefmt='%H:%M:%S'))
return console_handler
@ -470,3 +466,22 @@ def _setup_llm_logger(name: str, log_level: int) -> logging.Logger:
llm_prompt_logger = _setup_llm_logger('prompt', current_log_level)
llm_response_logger = _setup_llm_logger('response', current_log_level)
class OpenHandsLoggerAdapter(logging.LoggerAdapter):
extra: dict
def __init__(self, logger=openhands_logger, extra=None):
self.logger = logger
self.extra = extra or {}
def process(self, msg, kwargs):
"""
If 'extra' is supplied in kwargs, merge it with the adapters 'extra' dict
Starting in Python 3.13, LoggerAdapter's merge_extra option will do this.
"""
if 'extra' in kwargs and isinstance(kwargs['extra'], dict):
kwargs['extra'] = {**self.extra, **kwargs['extra']}
else:
kwargs['extra'] = self.extra
return msg, kwargs

View File

@ -73,14 +73,18 @@ class StandaloneConversationManager(ConversationManager):
if sid in self._active_conversations:
conversation, count = self._active_conversations[sid]
self._active_conversations[sid] = (conversation, count + 1)
logger.info(f'Reusing active conversation {sid}')
logger.info(
f'Reusing active conversation {sid}', extra={'session_id': sid}
)
return conversation
# Check if we have a detached conversation we can reuse
if sid in self._detached_conversations:
conversation, _ = self._detached_conversations.pop(sid)
self._active_conversations[sid] = (conversation, 1)
logger.info(f'Reusing detached conversation {sid}')
logger.info(
f'Reusing detached conversation {sid}', extra={'session_id': sid}
)
return conversation
# Create new conversation if none exists
@ -88,7 +92,10 @@ class StandaloneConversationManager(ConversationManager):
try:
await c.connect()
except AgentRuntimeUnavailableError as e:
logger.error(f'Error connecting to conversation {c.sid}: {e}')
logger.error(
f'Error connecting to conversation {c.sid}: {e}',
extra={'session_id': sid},
)
await c.disconnect()
return None
end_time = time.time()
@ -101,7 +108,10 @@ class StandaloneConversationManager(ConversationManager):
async def join_conversation(
self, sid: str, connection_id: str, settings: Settings, user_id: str | None
):
logger.info(f'join_conversation:{sid}:{connection_id}')
logger.info(
f'join_conversation:{sid}:{connection_id}',
extra={'session_id': sid, 'user_id': user_id},
)
await self.sio.enter_room(connection_id, ROOM_KEY.format(sid=sid))
self._local_connection_id_to_session_id[connection_id] = sid
event_stream = await self._get_event_stream(sid)
@ -234,14 +244,17 @@ class StandaloneConversationManager(ConversationManager):
user_id: str | None,
initial_user_msg: MessageAction | None = None,
) -> EventStream:
logger.info(f'maybe_start_agent_loop:{sid}')
logger.info(f'maybe_start_agent_loop:{sid}', extra={'session_id': sid})
session: Session | None = None
if not await self.is_agent_loop_running(sid):
logger.info(f'start_agent_loop:{sid}')
logger.info(f'start_agent_loop:{sid}', extra={'session_id': sid})
response_ids = await self.get_running_agent_loops(user_id)
if len(response_ids) >= self.config.max_concurrent_conversations:
logger.info('too_many_sessions_for:{user_id}')
logger.info(
'too_many_sessions_for:{user_id}',
extra={'session_id': sid, 'user_id': user_id},
)
# Get the conversations sorted (oldest first)
conversation_store = await self._get_conversation_store(user_id)
conversations = await conversation_store.get_all_metadata(response_ids)
@ -257,7 +270,6 @@ class StandaloneConversationManager(ConversationManager):
config=self.config,
sio=self.sio,
user_id=user_id,
monitoring_listener=self.monitoring_listener,
)
self._local_agent_loops_by_sid[sid] = session
asyncio.create_task(session.initialize_agent(settings, initial_user_msg))
@ -273,15 +285,18 @@ class StandaloneConversationManager(ConversationManager):
event_stream = await self._get_event_stream(sid)
if not event_stream:
logger.error(f'No event stream after starting agent loop: {sid}')
logger.error(
f'No event stream after starting agent loop: {sid}',
extra={'session_id': sid},
)
raise RuntimeError(f'no_event_stream:{sid}')
return event_stream
async def _get_event_stream(self, sid: str) -> EventStream | None:
logger.info(f'_get_event_stream:{sid}')
logger.info(f'_get_event_stream:{sid}', extra={'session_id': sid})
session = self._local_agent_loops_by_sid.get(sid)
if session:
logger.info(f'found_local_agent_loop:{sid}')
logger.info(f'found_local_agent_loop:{sid}', extra={'session_id': sid})
return session.agent_session.event_stream
return None
@ -300,10 +315,15 @@ class StandaloneConversationManager(ConversationManager):
async def disconnect_from_session(self, connection_id: str):
sid = self._local_connection_id_to_session_id.pop(connection_id, None)
logger.info(f'disconnect_from_session:{connection_id}:{sid}')
logger.info(
f'disconnect_from_session:{connection_id}:{sid}', extra={'session_id': sid}
)
if not sid:
# This can occur if the init action was never run.
logger.warning(f'disconnect_from_uninitialized_session:{connection_id}')
logger.warning(
f'disconnect_from_uninitialized_session:{connection_id}',
extra={'session_id': sid},
)
return
async def close_session(self, sid: str):
@ -312,7 +332,7 @@ class StandaloneConversationManager(ConversationManager):
await self._close_session(sid)
async def _close_session(self, sid: str):
logger.info(f'_close_session:{sid}')
logger.info(f'_close_session:{sid}', extra={'session_id': sid})
# Clear up local variables
connection_ids_to_remove = list(
@ -320,18 +340,21 @@ class StandaloneConversationManager(ConversationManager):
for connection_id, conn_sid in self._local_connection_id_to_session_id.items()
if sid == conn_sid
)
logger.info(f'removing connections: {connection_ids_to_remove}')
logger.info(
f'removing connections: {connection_ids_to_remove}',
extra={'session_id': sid},
)
for connnnection_id in connection_ids_to_remove:
self._local_connection_id_to_session_id.pop(connnnection_id, None)
session = self._local_agent_loops_by_sid.pop(sid, None)
if not session:
logger.warning(f'no_session_to_close:{sid}')
logger.warning(f'no_session_to_close:{sid}', extra={'session_id': sid})
return
logger.info(f'closing_session:{session.sid}')
logger.info(f'closing_session:{session.sid}', extra={'session_id': sid})
await session.close()
logger.info(f'closed_session:{session.sid}')
logger.info(f'closed_session:{session.sid}', extra={'session_id': sid})
@classmethod
def get_instance(

View File

@ -20,7 +20,6 @@ from openhands.server.shared import (
SettingsStoreImpl,
config,
conversation_manager,
monitoring_listener,
)
from openhands.server.types import LLMAuthenticationError, MissingSettingsError
from openhands.storage.data_models.conversation_metadata import ConversationMetadata
@ -46,7 +45,10 @@ async def _create_new_conversation(
image_urls: list[str] | None,
attach_convo_id: bool = False,
):
monitoring_listener.on_create_conversation()
logger.info(
'Creating conversation',
extra={'signal': 'create_conversation', 'user_id': user_id},
)
logger.info('Loading settings')
settings_store = await SettingsStoreImpl.get_instance(config, user_id)
settings = await settings_store.load()
@ -82,7 +84,10 @@ async def _create_new_conversation(
while await conversation_store.exists(conversation_id):
logger.warning(f'Collision on conversation ID: {conversation_id}. Retrying...')
conversation_id = uuid.uuid4().hex
logger.info(f'New conversation ID: {conversation_id}')
logger.info(
f'New conversation ID: {conversation_id}',
extra={'user_id': user_id, 'session_id': conversation_id},
)
repository_title = (
selected_repository.split('/')[-1] if selected_repository else None
@ -100,7 +105,10 @@ async def _create_new_conversation(
)
)
logger.info(f'Starting agent loop for conversation {conversation_id}')
logger.info(
f'Starting agent loop for conversation {conversation_id}',
extra={'user_id': user_id, 'session_id': conversation_id},
)
initial_message_action = None
if initial_user_msg or image_urls:
user_msg = (
@ -290,5 +298,6 @@ async def _get_conversation_info(
except Exception as e:
logger.error(
f'Error loading conversation {conversation.conversation_id}: {str(e)}',
extra={'session_id': conversation.conversation_id},
)
return None

View File

@ -1,5 +1,6 @@
import asyncio
import time
from logging import LoggerAdapter
from typing import Callable
from pydantic import SecretStr
@ -9,7 +10,7 @@ from openhands.controller.agent import Agent
from openhands.controller.state.state import State
from openhands.core.config import AgentConfig, AppConfig, LLMConfig
from openhands.core.exceptions import AgentRuntimeUnavailableError
from openhands.core.logger import openhands_logger as logger
from openhands.core.logger import OpenHandsLoggerAdapter
from openhands.core.schema.agent import AgentState
from openhands.events.action import ChangeAgentStateAction, MessageAction
from openhands.events.event import EventSource
@ -19,7 +20,6 @@ from openhands.runtime import get_runtime_cls
from openhands.runtime.base import Runtime
from openhands.runtime.impl.remote.remote_runtime import RemoteRuntime
from openhands.security import SecurityAnalyzer, options
from openhands.server.monitoring import MonitoringListener
from openhands.storage.files import FileStore
from openhands.utils.async_utils import call_sync_from_async
from openhands.utils.shutdown_listener import should_continue
@ -45,13 +45,12 @@ class AgentSession:
_started_at: float = 0
_closed: bool = False
loop: asyncio.AbstractEventLoop | None = None
monitoring_listener: MonitoringListener
logger: LoggerAdapter
def __init__(
self,
sid: str,
file_store: FileStore,
monitoring_listener: MonitoringListener,
status_callback: Callable | None = None,
github_user_id: str | None = None,
):
@ -67,7 +66,9 @@ class AgentSession:
self.file_store = file_store
self._status_callback = status_callback
self.github_user_id = github_user_id
self._monitoring_listener = monitoring_listener
self.logger = OpenHandsLoggerAdapter(
extra={'session_id': sid, 'user_id': github_user_id}
)
async def start(
self,
@ -99,7 +100,7 @@ class AgentSession:
)
if self._closed:
logger.warning('Session closed before starting')
self.logger.warning('Session closed before starting')
return
self._starting = True
started_at = time.time()
@ -147,8 +148,13 @@ class AgentSession:
finally:
self._starting = False
success = finished and runtime_connected
self._monitoring_listener.on_agent_session_start(
success, (time.time() - started_at)
self.logger.info(
'Agent session start',
extra={
'signal': 'agent_session_start',
'success': success,
'duration': (time.time() - started_at),
},
)
async def close(self):
@ -157,12 +163,12 @@ class AgentSession:
return
self._closed = True
while self._starting and should_continue():
logger.debug(
self.logger.debug(
f'Waiting for initialization to finish before closing session {self.sid}'
)
await asyncio.sleep(WAIT_TIME_BEFORE_CLOSE_INTERVAL)
if time.time() <= self._started_at + WAIT_TIME_BEFORE_CLOSE:
logger.error(
self.logger.error(
f'Waited too long for initialization to finish before closing session {self.sid}'
)
break
@ -185,7 +191,7 @@ class AgentSession:
"""
if security_analyzer:
logger.debug(f'Using security analyzer: {security_analyzer}')
self.logger.debug(f'Using security analyzer: {security_analyzer}')
self.security_analyzer = options.SecurityAnalyzers.get(
security_analyzer, SecurityAnalyzer
)(self.event_stream)
@ -213,7 +219,7 @@ class AgentSession:
if self.runtime is not None:
raise RuntimeError('Runtime already created')
logger.debug(f'Initializing runtime `{runtime_name}` now...')
self.logger.debug(f'Initializing runtime `{runtime_name}` now...')
runtime_cls = get_runtime_cls(runtime_name)
env_vars = (
{
@ -247,7 +253,7 @@ class AgentSession:
try:
await self.runtime.connect()
except AgentRuntimeUnavailableError as e:
logger.error(f'Runtime initialization failed: {e}')
self.logger.error(f'Runtime initialization failed: {e}')
if self._status_callback:
self._status_callback(
'error', 'STATUS$ERROR_RUNTIME_DISCONNECTED', str(e)
@ -274,7 +280,7 @@ class AgentSession:
selected_repository, repo_directory
)
logger.debug(
self.logger.debug(
f'Runtime initialized with plugins: {[plugin.name for plugin in self.runtime.plugins]}'
)
return True
@ -318,7 +324,7 @@ class AgentSession:
f'Plugins: {agent.sandbox_plugins}\n'
'-------------------------------------------------------------------------------------------'
)
logger.debug(msg)
self.logger.debug(msg)
controller = AgentController(
sid=self.sid,
@ -345,13 +351,13 @@ class AgentSession:
# if we have events in the stream.
try:
restored_state = State.restore_from_session(self.sid, self.file_store)
logger.debug(f'Restored state from session, sid: {self.sid}')
self.logger.debug(f'Restored state from session, sid: {self.sid}')
except Exception as e:
if self.event_stream.get_latest_event_id() > 0:
# if we have events, we should have a state
logger.warning(f'State could not be restored: {e}')
self.logger.warning(f'State could not be restored: {e}')
else:
logger.debug('No events found, no state to restore')
self.logger.debug('No events found, no state to restore')
return restored_state
def get_state(self) -> AgentState | None:

View File

@ -1,6 +1,7 @@
import asyncio
import time
from copy import deepcopy
from logging import LoggerAdapter
import socketio
@ -10,7 +11,7 @@ from openhands.core.config.condenser_config import (
LLMSummarizingCondenserConfig,
)
from openhands.core.const.guide_url import TROUBLESHOOTING_URL
from openhands.core.logger import openhands_logger as logger
from openhands.core.logger import OpenHandsLoggerAdapter
from openhands.core.schema import AgentState
from openhands.events.action import MessageAction, NullAction
from openhands.events.event import Event, EventSource
@ -23,7 +24,6 @@ from openhands.events.observation.error import ErrorObservation
from openhands.events.serialization import event_from_dict, event_to_dict
from openhands.events.stream import EventStreamSubscriber
from openhands.llm.llm import LLM
from openhands.server.monitoring import MonitoringListener
from openhands.server.session.agent_session import AgentSession
from openhands.server.session.conversation_init_data import ConversationInitData
from openhands.server.settings import Settings
@ -41,15 +41,14 @@ class Session:
loop: asyncio.AbstractEventLoop
config: AppConfig
file_store: FileStore
monitoring_listener: MonitoringListener
user_id: str | None
logger: LoggerAdapter
def __init__(
self,
sid: str,
config: AppConfig,
file_store: FileStore,
monitoring_listener: MonitoringListener,
sio: socketio.AsyncServer | None,
user_id: str | None = None,
):
@ -57,17 +56,16 @@ class Session:
self.sio = sio
self.last_active_ts = int(time.time())
self.file_store = file_store
self.logger = OpenHandsLoggerAdapter(extra={'session_id': sid})
self.agent_session = AgentSession(
sid,
file_store,
status_callback=self.queue_status_message,
github_user_id=user_id,
monitoring_listener=monitoring_listener,
)
self.agent_session.event_stream.subscribe(
EventStreamSubscriber.SERVER, self.on_event, self.sid
)
self.monitoring_listener = monitoring_listener
# Copying this means that when we update variables they are not applied to the shared global configuration!
self.config = deepcopy(config)
self.loop = asyncio.get_event_loop()
@ -120,7 +118,7 @@ class Session:
default_condenser_config = LLMSummarizingCondenserConfig(
llm_config=llm.config, keep_first=3, max_size=40
)
logger.info(f'Enabling default condenser: {default_condenser_config}')
self.logger.info(f'Enabling default condenser: {default_condenser_config}')
agent_config.condenser = default_condenser_config
agent = Agent.get_cls(agent_cls)(llm, agent_config)
@ -148,7 +146,7 @@ class Session:
initial_message=initial_message,
)
except Exception as e:
logger.exception(f'Error creating agent_session: {e}')
self.logger.exception(f'Error creating agent_session: {e}')
await self.send_error(
f'Error creating agent_session. Please check Docker is running and visit `{TROUBLESHOOTING_URL}` for more debugging information..'
)
@ -179,7 +177,6 @@ class Session:
Args:
event: The agent event (Observation or Action).
"""
self.monitoring_listener.on_session_event(event)
if isinstance(event, NullAction):
return
if isinstance(event, NullObservation):
@ -196,6 +193,14 @@ class Session:
event_dict = event_to_dict(event)
event_dict['source'] = EventSource.AGENT
await self.send(event_dict)
if (
isinstance(event, AgentStateChangedObservation)
and event.agent_state == AgentState.ERROR
):
self.logger.info(
'Agent status error',
extra={'signal': 'agent_status_error'},
)
elif isinstance(event, ErrorObservation):
# send error events as agent events to the UI
event_dict = event_to_dict(event)
@ -236,7 +241,7 @@ class Session:
self.last_active_ts = int(time.time())
return True
except RuntimeError as e:
logger.error(f'Error sending data to websocket: {str(e)}')
self.logger.error(f'Error sending data to websocket: {str(e)}')
self.is_alive = False
return False
@ -251,6 +256,10 @@ class Session:
controller = self.agent_session.controller
if controller is not None and not agent_session.is_closed():
await controller.set_agent_state_to(AgentState.ERROR)
self.logger.info(
'Agent status error',
extra={'signal': 'agent_status_error'},
)
await self.send(
{'status_update': True, 'type': msg_type, 'id': id, 'message': message}
)

View File

@ -10,7 +10,6 @@ from openhands.events import EventStream, EventStreamSubscriber
from openhands.llm import LLM
from openhands.llm.metrics import Metrics
from openhands.runtime.base import Runtime
from openhands.server.monitoring import MonitoringListener
from openhands.server.session.agent_session import AgentSession
from openhands.storage.memory import InMemoryFileStore
@ -48,7 +47,6 @@ async def test_agent_session_start_with_no_state(mock_agent):
session = AgentSession(
sid='test-session',
file_store=file_store,
monitoring_listener=MonitoringListener(),
)
# Create a mock runtime and set it up
@ -123,7 +121,6 @@ async def test_agent_session_start_with_restored_state(mock_agent):
session = AgentSession(
sid='test-session',
file_store=file_store,
monitoring_listener=MonitoringListener(),
)
# Create a mock runtime and set it up

View File

@ -6,7 +6,7 @@ from unittest.mock import patch
import pytest
from openhands.core.config import AppConfig, LLMConfig
from openhands.core.logger import json_log_handler
from openhands.core.logger import OpenHandsLoggerAdapter, json_log_handler
from openhands.core.logger import openhands_logger as openhands_logger
@ -131,7 +131,7 @@ def test_special_cases_masking(test_handler):
assert value not in log_output
class TestLogOutput:
class TestJsonOutput:
def test_info(self, json_handler):
logger, string_io = json_handler
@ -160,3 +160,28 @@ class TestLogOutput:
'message': 'Test message',
'level': 'INFO',
}
def test_extra_fields_from_adapter(self, json_handler):
logger, string_io = json_handler
subject = OpenHandsLoggerAdapter(logger, extra={'context_field': '..val..'})
subject.info('Test message', extra={'log_fied': '..val..'})
output = json.loads(string_io.getvalue())
del output['timestamp']
assert output == {
'context_field': '..val..',
'log_fied': '..val..',
'message': 'Test message',
'level': 'INFO',
}
def test_extra_fields_from_adapter_can_override(self, json_handler):
logger, string_io = json_handler
subject = OpenHandsLoggerAdapter(logger, extra={'override': 'a'})
subject.info('Test message', extra={'override': 'b'})
output = json.loads(string_io.getvalue())
del output['timestamp']
assert output == {
'override': 'b',
'message': 'Test message',
'level': 'INFO',
}

View File

@ -7,7 +7,6 @@ from litellm.exceptions import (
from openhands.core.config.app_config import AppConfig
from openhands.core.config.llm_config import LLMConfig
from openhands.server.monitoring import MonitoringListener
from openhands.server.session.session import Session
from openhands.storage.memory import InMemoryFileStore
@ -46,7 +45,6 @@ async def test_notify_on_llm_retry(
config=config,
sio=mock_sio,
user_id='..uid..',
monitoring_listener=MonitoringListener(),
)
session.queue_status_message = AsyncMock()