Add event search endpoints with filtering and pagination (#8538)

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
tofarr
2025-05-16 09:51:40 -06:00
committed by GitHub
parent 21dd91de63
commit 4733270e3c
4 changed files with 214 additions and 1 deletions

View File

@@ -78,6 +78,7 @@ class EventStore(EventStoreABC):
end_id: int | None = None,
reverse: bool = False,
filter: EventFilter | None = None,
limit: int | None = None,
) -> Iterable[Event]:
"""
Retrieve events from the event stream, optionally filtering out events of a given type
@@ -107,6 +108,7 @@ class EventStore(EventStoreABC):
step = 1
cache_page = _DUMMY_PAGE
num_results = 0
for index in range(start_id, end_id, step):
if not should_continue():
return
@@ -121,6 +123,9 @@ class EventStore(EventStoreABC):
if event:
if not filter or filter.include(event):
yield event
num_results += 1
if limit and limit <= num_results:
return
def get_event(self, id: int) -> Event:
filename = self._get_filename_for_id(id, self.user_id)

View File

@@ -23,6 +23,7 @@ class EventStoreABC:
end_id: int | None = None,
reverse: bool = False,
filter: EventFilter | None = None,
limit: int | None = None,
) -> Iterable[Event]:
"""
Retrieve events from the event stream, optionally excluding events using a filter

View File

@@ -1,8 +1,11 @@
from fastapi import APIRouter, Request, status
from fastapi import APIRouter, HTTPException, Request, status
from fastapi.responses import JSONResponse
from openhands.core.logger import openhands_logger as logger
from openhands.events.event_filter import EventFilter
from openhands.events.serialization.event import event_to_dict
from openhands.runtime.base import Runtime
from openhands.server.shared import conversation_manager
app = APIRouter(prefix='/api/conversations/{conversation_id}')
@@ -91,3 +94,66 @@ async def get_hosts(request: Request) -> JSONResponse:
'error': f'Error getting runtime hosts: {e}',
},
)
@app.get('/events')
async def search_events(
request: Request,
start_id: int = 0,
end_id: int | None = None,
reverse: bool = False,
filter: EventFilter | None = None,
limit: int = 20
):
"""Search through the event stream with filtering and pagination.
Args:
request: The incoming request object
start_id: Starting ID in the event stream. Defaults to 0
end_id: Ending ID in the event stream
reverse: Whether to retrieve events in reverse order. Defaults to False.
filter: Filter for events
limit: Maximum number of events to return. Must be between 1 and 100. Defaults to 20
Returns:
dict: Dictionary containing:
- events: List of matching events
- has_more: Whether there are more matching events after this batch
Raises:
HTTPException: If conversation is not found
ValueError: If limit is less than 1 or greater than 100
"""
if not request.state.conversation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail='Conversation not found'
)
if limit < 0 or limit > 100:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid limit'
)
# Get matching events from the stream
event_stream = request.state.conversation.event_stream
events = list(event_stream.search_events(
start_id=start_id,
end_id=end_id,
reverse=reverse,
filter=filter,
limit=limit + 1,
))
# Check if there are more events
has_more = len(events) > limit
if has_more:
events = events[:limit] # Remove the extra event
events = [event_to_dict(event) for event in events]
return {
'events': events,
'has_more': has_more,
}
@app.post('/events')
async def add_event(request: Request):
data = request.json()
conversation_manager.send_to_event_stream(request.state.sid, data)
return JSONResponse({"success": True})

View File

@@ -441,6 +441,147 @@ def test_cache_page_performance(temp_dir: str):
# In real-world scenarios with many more events, the performance difference would be more significant.
def test_search_events_limit(temp_dir: str):
"""Test that the search_events method correctly applies the limit parameter."""
file_store = get_file_store('local', temp_dir)
event_stream = EventStream('abc', file_store)
# Add 10 events
for i in range(10):
event_stream.add_event(NullObservation(f'test{i}'), EventSource.AGENT)
# Test with no limit (should return all events)
events = list(event_stream.search_events())
assert len(events) == 10
# Test with limit=5 (should return first 5 events)
events = list(event_stream.search_events(limit=5))
assert len(events) == 5
assert all(isinstance(e, NullObservation) for e in events)
assert [e.content for e in events] == ['test0', 'test1', 'test2', 'test3', 'test4']
# Test with limit=3 and start_id=5 (should return 3 events starting from ID 5)
events = list(event_stream.search_events(start_id=5, limit=3))
assert len(events) == 3
assert [e.content for e in events] == ['test5', 'test6', 'test7']
# Test with limit and reverse=True (should return events in reverse order)
events = list(event_stream.search_events(reverse=True, limit=4))
assert len(events) == 4
assert [e.content for e in events] == ['test9', 'test8', 'test7', 'test6']
# Test with limit and filter (should apply limit after filtering)
# Add some events with different content for filtering
event_stream.add_event(NullObservation('filter_me'), EventSource.AGENT)
event_stream.add_event(NullObservation('filter_me_too'), EventSource.AGENT)
events = list(
event_stream.search_events(filter=EventFilter(query='filter'), limit=1)
)
assert len(events) == 1
assert events[0].content == 'filter_me'
def test_search_events_limit_with_complex_filters(temp_dir: str):
"""Test the interaction between limit and various filter combinations in search_events."""
file_store = get_file_store('local', temp_dir)
event_stream = EventStream('abc', file_store)
# Add events with different sources and types
event_stream.add_event(NullAction(), EventSource.AGENT) # id 0
event_stream.add_event(NullObservation('test1'), EventSource.AGENT) # id 1
event_stream.add_event(MessageAction(content='hello'), EventSource.USER) # id 2
event_stream.add_event(NullObservation('test2'), EventSource.ENVIRONMENT) # id 3
event_stream.add_event(NullAction(), EventSource.AGENT) # id 4
event_stream.add_event(MessageAction(content='world'), EventSource.USER) # id 5
event_stream.add_event(NullObservation('hello world'), EventSource.AGENT) # id 6
# Test limit with type filter
events = list(
event_stream.search_events(
filter=EventFilter(include_types=(NullAction,)), limit=1
)
)
assert len(events) == 1
assert isinstance(events[0], NullAction)
assert events[0].id == 0
# Test limit with source filter
events = list(
event_stream.search_events(filter=EventFilter(source='user'), limit=1)
)
assert len(events) == 1
assert events[0].source == EventSource.USER
assert events[0].id == 2
# Test limit with query filter
events = list(
event_stream.search_events(filter=EventFilter(query='hello'), limit=2)
)
assert len(events) == 2
assert [e.id for e in events] == [2, 6]
# Test limit with combined filters
events = list(
event_stream.search_events(
filter=EventFilter(source='agent', include_types=(NullObservation,)),
limit=1,
)
)
assert len(events) == 1
assert isinstance(events[0], NullObservation)
assert events[0].source == EventSource.AGENT
assert events[0].id == 1
# Test limit with reverse and filter
events = list(
event_stream.search_events(
filter=EventFilter(source='agent'), reverse=True, limit=2
)
)
assert len(events) == 2
assert [e.id for e in events] == [6, 4]
def test_search_events_limit_edge_cases(temp_dir: str):
"""Test edge cases for the limit parameter in search_events."""
file_store = get_file_store('local', temp_dir)
event_stream = EventStream('abc', file_store)
# Add some events
for i in range(5):
event_stream.add_event(NullObservation(f'test{i}'), EventSource.AGENT)
# Test with limit=None (should return all events)
events = list(event_stream.search_events(limit=None))
assert len(events) == 5
# Test with limit larger than number of events
events = list(event_stream.search_events(limit=10))
assert len(events) == 5
# Test with limit=0 (let's check actual behavior)
events = list(event_stream.search_events(limit=0))
# If it returns all events, assert len(events) == 5
# If it returns no events, assert len(events) == 0
# Let's check the actual behavior
assert len(events) in [0, 5]
# Test with negative limit (implementation returns only first event)
events = list(event_stream.search_events(limit=-1))
assert len(events) == 1
# Test with empty result set and limit
events = list(
event_stream.search_events(filter=EventFilter(query='nonexistent'), limit=5)
)
assert len(events) == 0
# Test with start_id beyond available events
events = list(event_stream.search_events(start_id=10, limit=5))
assert len(events) == 0
def test_callback_dictionary_modification(temp_dir: str):
"""Test that the event stream can handle dictionary modification during iteration.