mirror of
https://github.com/OpenHands/OpenHands.git
synced 2025-12-26 05:48:36 +08:00
Added run_in_loop method (#9586)
Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
parent
6d62c341eb
commit
8fe2e006ee
@ -23,7 +23,12 @@ from openhands.storage.data_models.conversation_metadata import ConversationMeta
|
||||
from openhands.storage.data_models.conversation_status import ConversationStatus
|
||||
from openhands.storage.data_models.settings import Settings
|
||||
from openhands.storage.files import FileStore
|
||||
from openhands.utils.async_utils import GENERAL_TIMEOUT, call_async_from_sync, wait_all
|
||||
from openhands.utils.async_utils import (
|
||||
GENERAL_TIMEOUT,
|
||||
call_async_from_sync,
|
||||
run_in_loop,
|
||||
wait_all,
|
||||
)
|
||||
from openhands.utils.conversation_summary import (
|
||||
auto_generate_title,
|
||||
get_default_conversation_title,
|
||||
@ -61,8 +66,11 @@ class StandaloneConversationManager(ConversationManager):
|
||||
_conversations_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
|
||||
_cleanup_task: asyncio.Task | None = None
|
||||
_conversation_store_class: type[ConversationStore] | None = None
|
||||
_loop: asyncio.AbstractEventLoop | None = None
|
||||
|
||||
async def __aenter__(self):
|
||||
# Grab a reference to the main event loop. This is the loop in which `await sio.emit` must be called
|
||||
self._loop = asyncio.get_event_loop()
|
||||
self._cleanup_task = asyncio.create_task(self._cleanup_stale())
|
||||
return self
|
||||
|
||||
@ -297,10 +305,13 @@ class StandaloneConversationManager(ConversationManager):
|
||||
'id': 'AGENT_ERROR$TOO_MANY_CONVERSATIONS',
|
||||
'message': 'Too many conversations at once. If you are still using this one, try reactivating it by prompting the agent to continue',
|
||||
}
|
||||
await self.sio.emit(
|
||||
'oh_event',
|
||||
status_update_dict,
|
||||
to=ROOM_KEY.format(sid=oldest_conversation_id),
|
||||
await run_in_loop(
|
||||
self.sio.emit(
|
||||
'oh_event',
|
||||
status_update_dict,
|
||||
to=ROOM_KEY.format(sid=oldest_conversation_id),
|
||||
),
|
||||
self._loop, # type:ignore
|
||||
)
|
||||
await self.close_session(oldest_conversation_id)
|
||||
|
||||
@ -477,10 +488,13 @@ class StandaloneConversationManager(ConversationManager):
|
||||
'message': conversation_id,
|
||||
'conversation_title': conversation.title,
|
||||
}
|
||||
await self.sio.emit(
|
||||
'oh_event',
|
||||
status_update_dict,
|
||||
to=ROOM_KEY.format(sid=conversation_id),
|
||||
await run_in_loop(
|
||||
self.sio.emit(
|
||||
'oh_event',
|
||||
status_update_dict,
|
||||
to=ROOM_KEY.format(sid=conversation_id),
|
||||
),
|
||||
self._loop, # type:ignore
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f'Error emitting title update event: {e}')
|
||||
|
||||
@ -99,3 +99,25 @@ class AsyncException(Exception):
|
||||
|
||||
def __str__(self):
|
||||
return '\n'.join(str(e) for e in self.exceptions)
|
||||
|
||||
|
||||
async def run_in_loop(
|
||||
coro: Coroutine, loop: asyncio.AbstractEventLoop, timeout: float = GENERAL_TIMEOUT
|
||||
):
|
||||
"""
|
||||
Mitigate the dreaded "coroutine was created in a different event loop" error.
|
||||
Pass the coroutine to a different event loop if needed.
|
||||
"""
|
||||
running_loop = asyncio.get_running_loop()
|
||||
if running_loop == loop:
|
||||
result = await coro
|
||||
return result
|
||||
|
||||
result = await call_sync_from_async(_run_in_loop, coro, loop, timeout)
|
||||
return result
|
||||
|
||||
|
||||
def _run_in_loop(coro: Coroutine, loop: asyncio.AbstractEventLoop, timeout: float):
|
||||
future = asyncio.run_coroutine_threadsafe(coro, loop)
|
||||
result = future.result(timeout=timeout)
|
||||
return result
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import asyncio
|
||||
import concurrent.futures
|
||||
|
||||
import pytest
|
||||
|
||||
@ -6,6 +7,7 @@ from openhands.utils.async_utils import (
|
||||
AsyncException,
|
||||
call_async_from_sync,
|
||||
call_sync_from_async,
|
||||
run_in_loop,
|
||||
wait_all,
|
||||
)
|
||||
|
||||
@ -138,3 +140,202 @@ def test_call_async_from_sync_background_tasks():
|
||||
# (Even though some of these were started as background tasks)
|
||||
expected = ['dummy_started', 'dummy_started', 'bg_started', 'bg_finished']
|
||||
assert expected == events
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_in_loop_same_loop():
|
||||
"""Test run_in_loop when the target loop is the same as the current loop."""
|
||||
|
||||
async def dummy_coro(value: int):
|
||||
await asyncio.sleep(0.01) # Small delay to make it actually async
|
||||
return value * 2
|
||||
|
||||
# Get the current running loop
|
||||
current_loop = asyncio.get_running_loop()
|
||||
|
||||
# Create a coroutine and run it in the same loop
|
||||
coro = dummy_coro(5)
|
||||
result = await run_in_loop(coro, current_loop)
|
||||
|
||||
assert result == 10
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_in_loop_different_loop():
|
||||
"""Test run_in_loop when the target loop is different from the current loop."""
|
||||
import queue
|
||||
import threading
|
||||
|
||||
async def dummy_coro(value: int):
|
||||
await asyncio.sleep(0.01) # Small delay to make it actually async
|
||||
return value * 3
|
||||
|
||||
queue.Queue()
|
||||
loop_queue = queue.Queue()
|
||||
|
||||
def run_in_new_loop():
|
||||
"""Create and run a new event loop in a separate thread."""
|
||||
new_loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(new_loop)
|
||||
loop_queue.put(new_loop) # Share the loop with the main thread
|
||||
|
||||
try:
|
||||
# Keep the loop running for a short time
|
||||
new_loop.run_until_complete(asyncio.sleep(2.0))
|
||||
except Exception:
|
||||
pass # Expected when we stop the loop
|
||||
finally:
|
||||
new_loop.close()
|
||||
|
||||
# Start the new loop in a separate thread
|
||||
thread = threading.Thread(target=run_in_new_loop, daemon=True)
|
||||
thread.start()
|
||||
|
||||
# Get the new loop from the thread
|
||||
await asyncio.sleep(0.1) # Give thread time to start
|
||||
new_loop = loop_queue.get(timeout=1.0)
|
||||
|
||||
try:
|
||||
# Create a coroutine and run it in the different loop
|
||||
coro = dummy_coro(7)
|
||||
result = await run_in_loop(coro, new_loop)
|
||||
assert result == 21
|
||||
finally:
|
||||
# Clean up: stop the loop
|
||||
new_loop.call_soon_threadsafe(new_loop.stop)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_in_loop_with_exception():
|
||||
"""Test run_in_loop when the coroutine raises an exception."""
|
||||
|
||||
async def failing_coro():
|
||||
await asyncio.sleep(0.01)
|
||||
raise ValueError('Test exception')
|
||||
|
||||
current_loop = asyncio.get_running_loop()
|
||||
coro = failing_coro()
|
||||
|
||||
with pytest.raises(ValueError, match='Test exception'):
|
||||
await run_in_loop(coro, current_loop)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_in_loop_with_timeout():
|
||||
"""Test run_in_loop with a timeout when using different loops."""
|
||||
import queue
|
||||
import threading
|
||||
|
||||
async def slow_coro():
|
||||
await asyncio.sleep(1.0) # Sleep longer than timeout
|
||||
return 'should not reach here'
|
||||
|
||||
loop_queue = queue.Queue()
|
||||
|
||||
def run_in_new_loop():
|
||||
"""Create and run a new event loop in a separate thread."""
|
||||
new_loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(new_loop)
|
||||
loop_queue.put(new_loop)
|
||||
|
||||
try:
|
||||
# Keep the loop running for a short time
|
||||
new_loop.run_until_complete(asyncio.sleep(2.0))
|
||||
except Exception:
|
||||
pass # Expected when we stop the loop
|
||||
finally:
|
||||
new_loop.close()
|
||||
|
||||
# Start the new loop in a separate thread
|
||||
thread = threading.Thread(target=run_in_new_loop, daemon=True)
|
||||
thread.start()
|
||||
|
||||
# Get the new loop from the thread
|
||||
await asyncio.sleep(0.1) # Give thread time to start
|
||||
new_loop = loop_queue.get(timeout=1.0)
|
||||
|
||||
try:
|
||||
coro = slow_coro()
|
||||
# Test with a short timeout - this should raise a timeout exception
|
||||
with pytest.raises(
|
||||
(TimeoutError, concurrent.futures.TimeoutError)
|
||||
): # Could be TimeoutError or concurrent.futures.TimeoutError
|
||||
await run_in_loop(coro, new_loop, timeout=0.1)
|
||||
finally:
|
||||
# Clean up: stop the loop
|
||||
new_loop.call_soon_threadsafe(new_loop.stop)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_in_loop_same_loop_no_timeout():
|
||||
"""Test that run_in_loop doesn't apply timeout when using the same loop."""
|
||||
|
||||
async def quick_coro():
|
||||
await asyncio.sleep(0.01)
|
||||
return 'completed'
|
||||
|
||||
current_loop = asyncio.get_running_loop()
|
||||
coro = quick_coro()
|
||||
|
||||
# Even with a very short timeout, this should work because
|
||||
# timeout is only applied when using different loops
|
||||
result = await run_in_loop(coro, current_loop, timeout=0.001)
|
||||
assert result == 'completed'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_in_loop_return_value():
|
||||
"""Test that run_in_loop properly returns the coroutine result."""
|
||||
|
||||
async def return_dict():
|
||||
await asyncio.sleep(0.01)
|
||||
return {'key': 'value', 'number': 42}
|
||||
|
||||
current_loop = asyncio.get_running_loop()
|
||||
coro = return_dict()
|
||||
|
||||
result = await run_in_loop(coro, current_loop)
|
||||
|
||||
assert isinstance(result, dict)
|
||||
assert result['key'] == 'value'
|
||||
assert result['number'] == 42
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_in_loop_with_args_and_kwargs():
|
||||
"""Test run_in_loop with a coroutine that uses arguments."""
|
||||
|
||||
async def coro_with_args(a, b, multiplier=1):
|
||||
await asyncio.sleep(0.01)
|
||||
return (a + b) * multiplier
|
||||
|
||||
current_loop = asyncio.get_running_loop()
|
||||
|
||||
# Create coroutine with args and kwargs
|
||||
coro = coro_with_args(5, 10, multiplier=2)
|
||||
result = await run_in_loop(coro, current_loop)
|
||||
|
||||
assert result == 30 # (5 + 10) * 2
|
||||
|
||||
|
||||
def test_run_in_loop_sync_context():
|
||||
"""Test run_in_loop behavior when called from a synchronous context."""
|
||||
|
||||
async def dummy_coro(value: int):
|
||||
await asyncio.sleep(0.01)
|
||||
return value * 4
|
||||
|
||||
def sync_function():
|
||||
"""Function that runs in a new event loop."""
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
try:
|
||||
coro = dummy_coro(6)
|
||||
# This simulates the scenario where we have a different target loop
|
||||
return loop.run_until_complete(coro)
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
# Test the function in a synchronous context
|
||||
result = sync_function()
|
||||
assert result == 24
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user