OpenHands/openhands/utils/search_utils.py
Tim O'Farrell 6d14ce420e
Implement Export feature for V1 conversations with comprehensive unit tests (#12030)
Co-authored-by: openhands <openhands@all-hands.dev>
Co-authored-by: hieptl <hieptl.developer@gmail.com>
2025-12-24 17:50:57 +00:00

33 lines
1013 B
Python

import base64
from typing import AsyncIterator, Callable
def offset_to_page_id(offset: int, has_next: bool) -> str | None:
if not has_next:
return None
next_page_id = base64.b64encode(str(offset).encode()).decode()
return next_page_id
def page_id_to_offset(page_id: str | None) -> int:
if not page_id:
return 0
offset = int(base64.b64decode(page_id).decode())
return offset
async def iterate(fn: Callable, **kwargs) -> AsyncIterator:
"""Iterate over paged result sets. Assumes that the results sets contain an array of result objects, and a next_page_id"""
kwargs = {**kwargs}
kwargs['page_id'] = None
while True:
result_set = await fn(**kwargs)
items = getattr(result_set, 'items', None)
if items is None:
items = getattr(result_set, 'results')
for result in items:
yield result
if result_set.next_page_id is None:
return
kwargs['page_id'] = result_set.next_page_id