diff --git a/openhands/server/app.py b/openhands/server/app.py index b168795978..6207ccf2eb 100644 --- a/openhands/server/app.py +++ b/openhands/server/app.py @@ -1,4 +1,5 @@ import warnings +from contextlib import asynccontextmanager with warnings.catch_warnings(): warnings.simplefilter('ignore') @@ -21,10 +22,17 @@ from openhands.server.routes.feedback import app as feedback_api_router from openhands.server.routes.files import app as files_api_router from openhands.server.routes.public import app as public_api_router from openhands.server.routes.security import app as security_api_router -from openhands.server.shared import config +from openhands.server.shared import config, session_manager from openhands.utils.import_utils import get_impl -app = FastAPI() + +@asynccontextmanager +async def _lifespan(app: FastAPI): + async with session_manager: + yield + + +app = FastAPI(lifespan=_lifespan) app.add_middleware( LocalhostCORSMiddleware, allow_credentials=True, diff --git a/openhands/server/session/manager.py b/openhands/server/session/manager.py index 10b34b5dd2..fbcf4eaa83 100644 --- a/openhands/server/session/manager.py +++ b/openhands/server/session/manager.py @@ -52,6 +52,7 @@ class SessionManager: """ We use a redis backchannel to send actions between server nodes """ + logger.debug('_redis_subscribe') redis_client = self._get_redis_client() pubsub = redis_client.pubsub() await pubsub.subscribe('oh_event') @@ -75,7 +76,7 @@ class SessionManager: async def _process_message(self, message: dict): data = json.loads(message['data']) - logger.info(f'got_published_message:{message}') + logger.debug(f'got_published_message:{message}') sid = data['sid'] message_type = data['message_type'] if message_type == 'event': @@ -112,7 +113,7 @@ class SessionManager: elif message_type == 'session_closing': # Session closing event - We only get this in the event of graceful shutdown, # which can't be guaranteed - nodes can simply vanish unexpectedly! - logger.info(f'session_closing:{sid}') + logger.debug(f'session_closing:{sid}') for ( connection_id, local_sid, @@ -142,7 +143,9 @@ class SessionManager: async def detach_from_conversation(self, conversation: Conversation): await conversation.disconnect() - async def init_or_join_session(self, sid: str, connection_id: str, session_init_data: SessionInitData): + async def init_or_join_session( + self, sid: str, connection_id: str, session_init_data: SessionInitData + ): await self.sio.enter_room(connection_id, ROOM_KEY.format(sid=sid)) self.local_connection_id_to_session_id[connection_id] = sid @@ -165,6 +168,7 @@ class SessionManager: flag = asyncio.Event() self._session_is_running_flags[sid] = flag try: + logger.debug(f'publish:is_session_running:{sid}') await self._get_redis_client().publish( 'oh_event', json.dumps(