mirror of
https://github.com/OpenHands/OpenHands.git
synced 2026-03-22 05:37:20 +08:00
fix(nested_event_store): correct reverse pagination in search_events and add unit test (#10418)
Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
@@ -28,16 +28,22 @@ class NestedEventStore(EventStoreABC):
|
||||
filter: EventFilter | None = None,
|
||||
limit: int | None = None,
|
||||
) -> Iterable[Event]:
|
||||
# Maintain explicit cursors for pagination to avoid accidental mutation.
|
||||
start_cursor = start_id
|
||||
end_cursor: int | None = None # Used only for reverse pagination
|
||||
while True:
|
||||
search_params = {
|
||||
'start_id': start_id,
|
||||
search_params: dict[str, int | bool] = {
|
||||
'start_id': start_cursor,
|
||||
'reverse': reverse,
|
||||
}
|
||||
if reverse and end_cursor is not None:
|
||||
# Bound the upper end when scanning backwards to avoid duplicates
|
||||
search_params['end_id'] = end_cursor
|
||||
if limit is not None:
|
||||
search_params['limit'] = min(100, limit)
|
||||
search_str = urlencode(search_params)
|
||||
url = f'{self.base_url}/events?{search_str}'
|
||||
headers = {}
|
||||
headers: dict[str, str] = {}
|
||||
if self.session_api_key:
|
||||
headers['X-Session-API-Key'] = self.session_api_key
|
||||
response = httpx.get(url, headers=headers)
|
||||
@@ -45,9 +51,17 @@ class NestedEventStore(EventStoreABC):
|
||||
# Follow pattern of event store not throwing errors on not found
|
||||
return
|
||||
result_set = response.json()
|
||||
|
||||
page_min_id: int | None = None
|
||||
forward_next_start = start_cursor
|
||||
for result in result_set['events']:
|
||||
event = event_from_dict(result)
|
||||
start_id = max(start_id, event.id + 1)
|
||||
if reverse:
|
||||
page_min_id = (
|
||||
event.id if page_min_id is None else min(page_min_id, event.id)
|
||||
)
|
||||
else:
|
||||
forward_next_start = max(forward_next_start, event.id + 1)
|
||||
if end_id == event.id:
|
||||
if not filter or filter.include(event):
|
||||
yield event
|
||||
@@ -59,6 +73,14 @@ class NestedEventStore(EventStoreABC):
|
||||
limit -= 1
|
||||
if limit <= 0:
|
||||
return
|
||||
|
||||
# Update pagination cursor for next request
|
||||
if reverse and page_min_id is not None:
|
||||
# Next page should end strictly before the smallest ID we just saw
|
||||
end_cursor = page_min_id - 1
|
||||
elif not reverse:
|
||||
start_cursor = forward_next_start
|
||||
|
||||
if not result_set['has_more']:
|
||||
return
|
||||
|
||||
|
||||
@@ -372,3 +372,66 @@ class TestNestedEventStore:
|
||||
'http://test-api.example.com/events?start_id=0&reverse=False',
|
||||
headers={'X-Session-API-Key': 'test-api-key'},
|
||||
)
|
||||
|
||||
@patch('httpx.get')
|
||||
def test_search_events_reverse_pagination_multiple_pages(
|
||||
self, mock_get, event_store
|
||||
):
|
||||
"""Ensure reverse pagination works across multiple server pages.
|
||||
|
||||
We emulate the remote /events endpoint by using an in-memory EventStream as the
|
||||
backing store and having httpx.get return paginated JSON responses derived from it.
|
||||
"""
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
|
||||
from openhands.events.event import EventSource
|
||||
from openhands.events.observation import NullObservation
|
||||
from openhands.events.serialization.event import event_to_dict
|
||||
from openhands.events.stream import EventStream
|
||||
from openhands.storage.memory import InMemoryFileStore
|
||||
|
||||
# Build a fake server-side store with many events so that server pagination kicks in
|
||||
fs = InMemoryFileStore()
|
||||
server_stream = EventStream('test-session', fs, user_id='test-user')
|
||||
total_events = 50
|
||||
for i in range(total_events):
|
||||
server_stream.add_event(NullObservation(f'e{i}'), EventSource.AGENT)
|
||||
|
||||
def server_side_get(url: str, headers: dict | None = None):
|
||||
# Parse query params like the FastAPI layer would receive
|
||||
parsed = urlparse(url)
|
||||
qs = parse_qs(parsed.query)
|
||||
start_id = int(qs.get('start_id', ['0'])[0])
|
||||
reverse = qs.get('reverse', ['False'])[0] == 'True'
|
||||
end_id = int(qs['end_id'][0]) if 'end_id' in qs else None
|
||||
limit = int(qs.get('limit', ['20'])[0]) # server default = 20
|
||||
|
||||
# Emulate server route logic: request limit+1 to compute has_more
|
||||
events = list(
|
||||
server_stream.search_events(
|
||||
start_id=start_id, end_id=end_id, reverse=reverse, limit=limit + 1
|
||||
)
|
||||
)
|
||||
has_more = len(events) > limit
|
||||
if has_more:
|
||||
events = events[:limit]
|
||||
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.json.return_value = {
|
||||
'events': [event_to_dict(e) for e in events],
|
||||
'has_more': has_more,
|
||||
}
|
||||
return mock_response
|
||||
|
||||
mock_get.side_effect = server_side_get
|
||||
|
||||
# Execute the nested search in reverse without a client-side limit
|
||||
results = list(event_store.search_events(reverse=True))
|
||||
|
||||
# Verify we received all events in descending order
|
||||
assert len(results) == total_events
|
||||
assert [e.id for e in results] == list(range(total_events - 1, -1, -1))
|
||||
|
||||
# Ensure multiple HTTP calls were made due to pagination
|
||||
assert mock_get.call_count >= 2
|
||||
|
||||
Reference in New Issue
Block a user