diff --git a/openhands/events/nested_event_store.py b/openhands/events/nested_event_store.py index 775475394e..cd7819eb1a 100644 --- a/openhands/events/nested_event_store.py +++ b/openhands/events/nested_event_store.py @@ -28,16 +28,22 @@ class NestedEventStore(EventStoreABC): filter: EventFilter | None = None, limit: int | None = None, ) -> Iterable[Event]: + # Maintain explicit cursors for pagination to avoid accidental mutation. + start_cursor = start_id + end_cursor: int | None = None # Used only for reverse pagination while True: - search_params = { - 'start_id': start_id, + search_params: dict[str, int | bool] = { + 'start_id': start_cursor, 'reverse': reverse, } + if reverse and end_cursor is not None: + # Bound the upper end when scanning backwards to avoid duplicates + search_params['end_id'] = end_cursor if limit is not None: search_params['limit'] = min(100, limit) search_str = urlencode(search_params) url = f'{self.base_url}/events?{search_str}' - headers = {} + headers: dict[str, str] = {} if self.session_api_key: headers['X-Session-API-Key'] = self.session_api_key response = httpx.get(url, headers=headers) @@ -45,9 +51,17 @@ class NestedEventStore(EventStoreABC): # Follow pattern of event store not throwing errors on not found return result_set = response.json() + + page_min_id: int | None = None + forward_next_start = start_cursor for result in result_set['events']: event = event_from_dict(result) - start_id = max(start_id, event.id + 1) + if reverse: + page_min_id = ( + event.id if page_min_id is None else min(page_min_id, event.id) + ) + else: + forward_next_start = max(forward_next_start, event.id + 1) if end_id == event.id: if not filter or filter.include(event): yield event @@ -59,6 +73,14 @@ class NestedEventStore(EventStoreABC): limit -= 1 if limit <= 0: return + + # Update pagination cursor for next request + if reverse and page_min_id is not None: + # Next page should end strictly before the smallest ID we just saw + end_cursor = page_min_id - 1 + elif not reverse: + start_cursor = forward_next_start + if not result_set['has_more']: return diff --git a/tests/unit/test_nested_event_store.py b/tests/unit/test_nested_event_store.py index e6dda119c5..af9b6ea7f0 100644 --- a/tests/unit/test_nested_event_store.py +++ b/tests/unit/test_nested_event_store.py @@ -372,3 +372,66 @@ class TestNestedEventStore: 'http://test-api.example.com/events?start_id=0&reverse=False', headers={'X-Session-API-Key': 'test-api-key'}, ) + + @patch('httpx.get') + def test_search_events_reverse_pagination_multiple_pages( + self, mock_get, event_store + ): + """Ensure reverse pagination works across multiple server pages. + + We emulate the remote /events endpoint by using an in-memory EventStream as the + backing store and having httpx.get return paginated JSON responses derived from it. + """ + from urllib.parse import parse_qs, urlparse + + from openhands.events.event import EventSource + from openhands.events.observation import NullObservation + from openhands.events.serialization.event import event_to_dict + from openhands.events.stream import EventStream + from openhands.storage.memory import InMemoryFileStore + + # Build a fake server-side store with many events so that server pagination kicks in + fs = InMemoryFileStore() + server_stream = EventStream('test-session', fs, user_id='test-user') + total_events = 50 + for i in range(total_events): + server_stream.add_event(NullObservation(f'e{i}'), EventSource.AGENT) + + def server_side_get(url: str, headers: dict | None = None): + # Parse query params like the FastAPI layer would receive + parsed = urlparse(url) + qs = parse_qs(parsed.query) + start_id = int(qs.get('start_id', ['0'])[0]) + reverse = qs.get('reverse', ['False'])[0] == 'True' + end_id = int(qs['end_id'][0]) if 'end_id' in qs else None + limit = int(qs.get('limit', ['20'])[0]) # server default = 20 + + # Emulate server route logic: request limit+1 to compute has_more + events = list( + server_stream.search_events( + start_id=start_id, end_id=end_id, reverse=reverse, limit=limit + 1 + ) + ) + has_more = len(events) > limit + if has_more: + events = events[:limit] + + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + 'events': [event_to_dict(e) for e in events], + 'has_more': has_more, + } + return mock_response + + mock_get.side_effect = server_side_get + + # Execute the nested search in reverse without a client-side limit + results = list(event_store.search_events(reverse=True)) + + # Verify we received all events in descending order + assert len(results) == total_events + assert [e.id for e in results] == list(range(total_events - 1, -1, -1)) + + # Ensure multiple HTTP calls were made due to pagination + assert mock_get.call_count >= 2