diff --git a/openhands/core/config/openhands_config.py b/openhands/core/config/openhands_config.py index 65ea78ddce..792aabb7c0 100644 --- a/openhands/core/config/openhands_config.py +++ b/openhands/core/config/openhands_config.py @@ -72,6 +72,7 @@ class OpenHandsConfig(BaseModel): file_store_path: str = Field(default='~/.openhands') file_store_web_hook_url: str | None = Field(default=None) file_store_web_hook_headers: dict | None = Field(default=None) + file_store_web_hook_batch: bool = Field(default=False) enable_browser: bool = Field(default=True) save_trajectory_path: str | None = Field(default=None) save_screenshots_in_trajectory: bool = Field(default=False) diff --git a/openhands/server/shared.py b/openhands/server/shared.py index 715cd0e630..94a910fb62 100644 --- a/openhands/server/shared.py +++ b/openhands/server/shared.py @@ -31,6 +31,7 @@ file_store: FileStore = get_file_store( config.file_store_path, config.file_store_web_hook_url, config.file_store_web_hook_headers, + batch=config.file_store_web_hook_batch, ) client_manager = None diff --git a/openhands/storage/README.md b/openhands/storage/README.md index 07935c0538..7a638a6cc7 100644 --- a/openhands/storage/README.md +++ b/openhands/storage/README.md @@ -61,9 +61,12 @@ The `WebHookFileStore` wraps another `FileStore` implementation and sends HTTP r **Configuration Options:** - `file_store_web_hook_url`: The base URL for webhook requests - `file_store_web_hook_headers`: HTTP headers to include in webhook requests +- `file_store_web_hook_batch`: Whether to use batched webhook requests (default: false) ### Protocol Details +#### Standard Webhook Protocol (Non-Batched) + 1. **File Write Operation**: - When a file is written, a POST request is sent to `{base_url}{path}` - The request body contains the file contents @@ -73,6 +76,27 @@ The `WebHookFileStore` wraps another `FileStore` implementation and sends HTTP r - When a file is deleted, a DELETE request is sent to `{base_url}{path}` - The operation is retried up to 3 times with a 1-second delay between attempts +#### Batched Webhook Protocol + +The `BatchedWebHookFileStore` extends the webhook functionality by batching multiple file operations into a single request, which can significantly improve performance when many files are being modified in a short period of time. + +1. **Batch Request**: + - A single POST request is sent to `{base_url}` with a JSON array in the body + - Each item in the array contains: + - `method`: "POST" for write operations, "DELETE" for delete operations + - `path`: The file path + - `content`: The file contents (for write operations only) + - `encoding`: "base64" if binary content was base64-encoded (optional) + +2. **Batch Triggering**: + - Batches are sent when one of the following conditions is met: + - A timeout period has elapsed (defaults to 5 seconds, configurable via constructor parameter) + - The total size of batched content exceeds a size limit (defaults to 1MB, configurable via constructor parameter) + - The `flush()` method is explicitly called + +3. **Error Handling**: + - The batch request is retried up to 3 times with a 1-second delay between attempts + ## Configuration To configure the storage module in OpenHands, use the following configuration options: @@ -90,4 +114,14 @@ file_store_web_hook_url = "https://example.com/api/files" # Optional webhook headers (JSON string) file_store_web_hook_headers = '{"Authorization": "Bearer token"}' + +# Optional batched webhook mode (default: false) +file_store_web_hook_batch = true ``` + +**Batched Webhook Configuration:** +The batched webhook behavior uses predefined constants with the following default values: +- Batch timeout: 5 seconds +- Batch size limit: 1MB (1048576 bytes) + +These values can be customized by passing `batch_timeout_seconds` and `batch_size_limit_bytes` parameters to the `BatchedWebHookFileStore` constructor. diff --git a/openhands/storage/__init__.py b/openhands/storage/__init__.py index 262668ffd5..d5f7dd7089 100644 --- a/openhands/storage/__init__.py +++ b/openhands/storage/__init__.py @@ -2,6 +2,7 @@ import os import httpx +from openhands.storage.batched_web_hook import BatchedWebHookFileStore from openhands.storage.files import FileStore from openhands.storage.google_cloud import GoogleCloudFileStore from openhands.storage.local import LocalFileStore @@ -15,6 +16,7 @@ def get_file_store( file_store_path: str | None = None, file_store_web_hook_url: str | None = None, file_store_web_hook_headers: dict | None = None, + batch: bool = False, ) -> FileStore: store: FileStore if file_store_type == 'local': @@ -35,9 +37,21 @@ def get_file_store( file_store_web_hook_headers['X-Session-API-Key'] = os.getenv( 'SESSION_API_KEY' ) - store = WebHookFileStore( - store, - file_store_web_hook_url, - httpx.Client(headers=file_store_web_hook_headers or {}), - ) + + client = httpx.Client(headers=file_store_web_hook_headers or {}) + + if batch: + # Use batched webhook file store + store = BatchedWebHookFileStore( + store, + file_store_web_hook_url, + client, + ) + else: + # Use regular webhook file store + store = WebHookFileStore( + store, + file_store_web_hook_url, + client, + ) return store diff --git a/openhands/storage/batched_web_hook.py b/openhands/storage/batched_web_hook.py new file mode 100644 index 0000000000..057b7f0a96 --- /dev/null +++ b/openhands/storage/batched_web_hook.py @@ -0,0 +1,274 @@ +import threading +from typing import Optional, Union + +import httpx +import tenacity + +from openhands.storage.files import FileStore +from openhands.utils.async_utils import EXECUTOR + +# Constants for batching configuration +WEBHOOK_BATCH_TIMEOUT_SECONDS = 5.0 +WEBHOOK_BATCH_SIZE_LIMIT_BYTES = 1048576 # 1MB + + +class BatchedWebHookFileStore(FileStore): + """ + File store which batches updates before sending them to a webhook. + + This class wraps another FileStore implementation and sends HTTP requests + to a specified URL when files are written or deleted. Updates are batched + and sent together after a certain amount of time passes or if the content + size exceeds a threshold. + + Attributes: + file_store: The underlying FileStore implementation + base_url: The base URL for webhook requests + client: The HTTP client used to make webhook requests + batch_timeout_seconds: Time in seconds after which a batch is sent (default: WEBHOOK_BATCH_TIMEOUT_SECONDS) + batch_size_limit_bytes: Size limit in bytes after which a batch is sent (default: WEBHOOK_BATCH_SIZE_LIMIT_BYTES) + _batch_lock: Lock for thread-safe access to the batch + _batch: Dictionary of pending file updates + _batch_timer: Timer for sending batches after timeout + _batch_size: Current size of the batch in bytes + """ + + file_store: FileStore + base_url: str + client: httpx.Client + batch_timeout_seconds: float + batch_size_limit_bytes: int + _batch_lock: threading.Lock + _batch: dict[str, tuple[str, Optional[Union[str, bytes]]]] + _batch_timer: Optional[threading.Timer] + _batch_size: int + + def __init__( + self, + file_store: FileStore, + base_url: str, + client: Optional[httpx.Client] = None, + batch_timeout_seconds: Optional[float] = None, + batch_size_limit_bytes: Optional[int] = None, + ): + """ + Initialize a BatchedWebHookFileStore. + + Args: + file_store: The underlying FileStore implementation + base_url: The base URL for webhook requests + client: Optional HTTP client to use for requests. If None, a new client will be created. + batch_timeout_seconds: Time in seconds after which a batch is sent. + If None, uses the default constant WEBHOOK_BATCH_TIMEOUT_SECONDS. + batch_size_limit_bytes: Size limit in bytes after which a batch is sent. + If None, uses the default constant WEBHOOK_BATCH_SIZE_LIMIT_BYTES. + """ + self.file_store = file_store + self.base_url = base_url + if client is None: + client = httpx.Client() + self.client = client + + # Use provided values or default constants + self.batch_timeout_seconds = ( + batch_timeout_seconds or WEBHOOK_BATCH_TIMEOUT_SECONDS + ) + self.batch_size_limit_bytes = ( + batch_size_limit_bytes or WEBHOOK_BATCH_SIZE_LIMIT_BYTES + ) + + # Initialize batch state + self._batch_lock = threading.Lock() + self._batch = {} # Maps path -> (operation, content) + self._batch_timer = None + self._batch_size = 0 + + def write(self, path: str, contents: Union[str, bytes]) -> None: + """ + Write contents to a file and queue a webhook update. + + Args: + path: The path to write to + contents: The contents to write + """ + self.file_store.write(path, contents) + self._queue_update(path, 'write', contents) + + def read(self, path: str) -> str: + """ + Read contents from a file. + + Args: + path: The path to read from + + Returns: + The contents of the file + """ + return self.file_store.read(path) + + def list(self, path: str) -> list[str]: + """ + List files in a directory. + + Args: + path: The directory path to list + + Returns: + A list of file paths + """ + return self.file_store.list(path) + + def delete(self, path: str) -> None: + """ + Delete a file and queue a webhook update. + + Args: + path: The path to delete + """ + self.file_store.delete(path) + self._queue_update(path, 'delete', None) + + def _queue_update( + self, path: str, operation: str, contents: Optional[Union[str, bytes]] + ) -> None: + """ + Queue an update to be sent to the webhook. + + Args: + path: The path that was modified + operation: The operation performed ("write" or "delete") + contents: The contents that were written (None for delete operations) + """ + with self._batch_lock: + # Calculate content size + content_size = 0 + if contents is not None: + if isinstance(contents, str): + content_size = len(contents.encode('utf-8')) + else: + content_size = len(contents) + + # Update batch size calculation + # If this path already exists in the batch, subtract its previous size + if path in self._batch: + prev_op, prev_contents = self._batch[path] + if prev_contents is not None: + if isinstance(prev_contents, str): + self._batch_size -= len(prev_contents.encode('utf-8')) + else: + self._batch_size -= len(prev_contents) + + # Add new content size + self._batch_size += content_size + + # Add to batch + self._batch[path] = (operation, contents) + + # Check if we need to send the batch due to size limit + if self._batch_size >= self.batch_size_limit_bytes: + # Submit to executor to avoid blocking + EXECUTOR.submit(self._send_batch) + return + + # Start or reset the timer for sending the batch + if self._batch_timer is not None: + self._batch_timer.cancel() + self._batch_timer = None + + timer = threading.Timer( + self.batch_timeout_seconds, self._send_batch_from_timer + ) + timer.daemon = True + timer.start() + self._batch_timer = timer + + def _send_batch_from_timer(self) -> None: + """ + Send the batch from the timer thread. + This method is called by the timer and submits the actual sending to the executor. + """ + EXECUTOR.submit(self._send_batch) + + def _send_batch(self) -> None: + """ + Send the current batch of updates to the webhook as a single request. + This method acquires the batch lock and processes all pending updates in one batch. + """ + batch_to_send: dict[str, tuple[str, Optional[Union[str, bytes]]]] = {} + + with self._batch_lock: + if not self._batch: + return + + # Copy the batch and clear the current one + batch_to_send = self._batch.copy() + self._batch.clear() + self._batch_size = 0 + + # Cancel any pending timer + if self._batch_timer is not None: + self._batch_timer.cancel() + self._batch_timer = None + + # Process the entire batch in a single request + if batch_to_send: + try: + self._send_batch_request(batch_to_send) + except Exception as e: + # Log the error + print(f'Error sending webhook batch: {e}') + + @tenacity.retry( + wait=tenacity.wait_fixed(1), + stop=tenacity.stop_after_attempt(3), + ) + def _send_batch_request( + self, batch: dict[str, tuple[str, Optional[Union[str, bytes]]]] + ) -> None: + """ + Send a single batch request to the webhook URL with all updates. + + This method is retried up to 3 times with a 1-second delay between attempts. + + Args: + batch: Dictionary mapping paths to (operation, contents) tuples + + Raises: + httpx.HTTPStatusError: If the webhook request fails + """ + # Prepare the batch payload + batch_payload = [] + + for path, (operation, contents) in batch.items(): + item = { + 'method': 'POST' if operation == 'write' else 'DELETE', + 'path': path, + } + + if operation == 'write' and contents is not None: + # Convert bytes to string if needed + if isinstance(contents, bytes): + try: + # Try to decode as UTF-8 + item['content'] = contents.decode('utf-8') + except UnicodeDecodeError: + # If not UTF-8, use base64 encoding + import base64 + + item['content'] = base64.b64encode(contents).decode('ascii') + item['encoding'] = 'base64' + else: + item['content'] = contents + + batch_payload.append(item) + + # Send the batch as a single request + response = self.client.post(self.base_url, json=batch_payload) + response.raise_for_status() + + def flush(self) -> None: + """ + Immediately send any pending updates to the webhook. + This can be called to ensure all updates are sent before shutting down. + """ + self._send_batch() diff --git a/tests/unit/test_batched_web_hook.py b/tests/unit/test_batched_web_hook.py new file mode 100644 index 0000000000..77322793b3 --- /dev/null +++ b/tests/unit/test_batched_web_hook.py @@ -0,0 +1,235 @@ +import time +from unittest.mock import MagicMock + +import httpx +import pytest + +from openhands.storage.batched_web_hook import BatchedWebHookFileStore +from openhands.storage.files import FileStore + + +class MockFileStore(FileStore): + def __init__(self): + self.files = {} + + def write(self, path: str, contents: str | bytes) -> None: + self.files[path] = contents + + def read(self, path: str) -> str: + return self.files.get(path, '') + + def list(self, path: str) -> list[str]: + return [k for k in self.files.keys() if k.startswith(path)] + + def delete(self, path: str) -> None: + if path in self.files: + del self.files[path] + + +class TestBatchedWebHookFileStore: + @pytest.fixture + def mock_client(self): + client = MagicMock(spec=httpx.Client) + client.post.return_value.raise_for_status = MagicMock() + client.delete.return_value.raise_for_status = MagicMock() + return client + + @pytest.fixture + def file_store(self): + return MockFileStore() + + @pytest.fixture + def batched_store(self, file_store, mock_client): + # Use a short timeout for testing + return BatchedWebHookFileStore( + file_store=file_store, + base_url='http://example.com', + client=mock_client, + batch_timeout_seconds=0.1, # Short timeout for testing + batch_size_limit_bytes=1000, + ) + + def test_write_operation_batched(self, batched_store, mock_client): + # Write a file + batched_store.write('/test.txt', 'Hello, world!') + + # The client should not have been called yet + mock_client.post.assert_not_called() + + # Wait for the batch timeout + time.sleep(0.2) + + # Now the client should have been called with a batch payload + mock_client.post.assert_called_once() + args, kwargs = mock_client.post.call_args + assert args[0] == 'http://example.com' + assert 'json' in kwargs + + # Check the batch payload + batch_payload = kwargs['json'] + assert isinstance(batch_payload, list) + assert len(batch_payload) == 1 + assert batch_payload[0]['method'] == 'POST' + assert batch_payload[0]['path'] == '/test.txt' + assert batch_payload[0]['content'] == 'Hello, world!' + + def test_delete_operation_batched(self, batched_store, mock_client): + # Write and then delete a file + batched_store.write('/test.txt', 'Hello, world!') + batched_store.delete('/test.txt') + + # The client should not have been called yet + mock_client.post.assert_not_called() + + # Wait for the batch timeout + time.sleep(0.2) + + # Now the client should have been called with a batch payload + mock_client.post.assert_called_once() + args, kwargs = mock_client.post.call_args + assert args[0] == 'http://example.com' + assert 'json' in kwargs + + # Check the batch payload + batch_payload = kwargs['json'] + assert isinstance(batch_payload, list) + assert len(batch_payload) == 1 + assert batch_payload[0]['method'] == 'DELETE' + assert batch_payload[0]['path'] == '/test.txt' + assert 'content' not in batch_payload[0] + + def test_batch_size_limit_triggers_send(self, batched_store, mock_client): + # Write a large file that exceeds the batch size limit + large_content = 'x' * 1001 # Exceeds the 1000 byte limit + batched_store.write('/large.txt', large_content) + + # The batch might be sent asynchronously, so we need to wait a bit + time.sleep(0.2) + + # The client should have been called due to size limit + mock_client.post.assert_called_once() + args, kwargs = mock_client.post.call_args + assert args[0] == 'http://example.com' + assert 'json' in kwargs + + # Check the batch payload + batch_payload = kwargs['json'] + assert isinstance(batch_payload, list) + assert len(batch_payload) == 1 + assert batch_payload[0]['method'] == 'POST' + assert batch_payload[0]['path'] == '/large.txt' + assert batch_payload[0]['content'] == large_content + + def test_multiple_updates_same_file(self, batched_store, mock_client): + # Write to the same file multiple times + batched_store.write('/test.txt', 'Version 1') + batched_store.write('/test.txt', 'Version 2') + batched_store.write('/test.txt', 'Version 3') + + # Wait for the batch timeout + time.sleep(0.2) + + # Only the latest version should be sent + mock_client.post.assert_called_once() + args, kwargs = mock_client.post.call_args + assert args[0] == 'http://example.com' + assert 'json' in kwargs + + # Check the batch payload + batch_payload = kwargs['json'] + assert isinstance(batch_payload, list) + assert len(batch_payload) == 1 + assert batch_payload[0]['method'] == 'POST' + assert batch_payload[0]['path'] == '/test.txt' + assert batch_payload[0]['content'] == 'Version 3' + + def test_flush_sends_immediately(self, batched_store, mock_client): + # Write a file + batched_store.write('/test.txt', 'Hello, world!') + + # The client should not have been called yet + mock_client.post.assert_not_called() + + # Flush the batch + batched_store.flush() + + # Now the client should have been called without waiting for timeout + mock_client.post.assert_called_once() + args, kwargs = mock_client.post.call_args + assert args[0] == 'http://example.com' + assert 'json' in kwargs + + # Check the batch payload + batch_payload = kwargs['json'] + assert isinstance(batch_payload, list) + assert len(batch_payload) == 1 + assert batch_payload[0]['method'] == 'POST' + assert batch_payload[0]['path'] == '/test.txt' + assert batch_payload[0]['content'] == 'Hello, world!' + + def test_multiple_operations_in_single_batch(self, batched_store, mock_client): + # Perform multiple operations + batched_store.write('/file1.txt', 'Content 1') + batched_store.write('/file2.txt', 'Content 2') + batched_store.delete('/file3.txt') + + # Wait for the batch timeout + time.sleep(0.2) + + # Check that only one POST request was made with all operations + mock_client.post.assert_called_once() + args, kwargs = mock_client.post.call_args + assert args[0] == 'http://example.com' + assert 'json' in kwargs + + # Check the batch payload + batch_payload = kwargs['json'] + assert isinstance(batch_payload, list) + assert len(batch_payload) == 3 + + # Check each operation in the batch + operations = {item['path']: item for item in batch_payload} + + assert '/file1.txt' in operations + assert operations['/file1.txt']['method'] == 'POST' + assert operations['/file1.txt']['content'] == 'Content 1' + + assert '/file2.txt' in operations + assert operations['/file2.txt']['method'] == 'POST' + assert operations['/file2.txt']['content'] == 'Content 2' + + assert '/file3.txt' in operations + assert operations['/file3.txt']['method'] == 'DELETE' + assert 'content' not in operations['/file3.txt'] + + def test_binary_content_handling(self, batched_store, mock_client): + # Write binary content + binary_content = b'\x00\x01\x02\x03\xff\xfe\xfd\xfc' + batched_store.write('/binary.bin', binary_content) + + # Wait for the batch timeout + time.sleep(0.2) + + # Check that the client was called + mock_client.post.assert_called_once() + args, kwargs = mock_client.post.call_args + assert args[0] == 'http://example.com' + assert 'json' in kwargs + + # Check the batch payload + batch_payload = kwargs['json'] + assert isinstance(batch_payload, list) + assert len(batch_payload) == 1 + + # Binary content should be base64 encoded + assert batch_payload[0]['method'] == 'POST' + assert batch_payload[0]['path'] == '/binary.bin' + assert 'content' in batch_payload[0] + assert 'encoding' in batch_payload[0] + assert batch_payload[0]['encoding'] == 'base64' + + # Verify the content can be decoded back to the original binary + import base64 + + decoded = base64.b64decode(batch_payload[0]['content'].encode('ascii')) + assert decoded == binary_content