diff --git a/docs/modules/usage/configuration-options.md b/docs/modules/usage/configuration-options.md index d806309bdd..066188cdd3 100644 --- a/docs/modules/usage/configuration-options.md +++ b/docs/modules/usage/configuration-options.md @@ -66,9 +66,19 @@ The core configuration options are defined in the `[core]` section of the `confi - `file_store` - Type: `str` - - Default: `"memory"` + - Default: `"local"` - Description: File store type +- `file_store_web_hook_url` + - Type: `str` + - Default: `None` + - Description: Optional url for a webhook to invoke after file store writes / deletes + +- `file_store_web_hook_headers` + - Type: `str` + - Default: `None` + - Description: HTTP Headers to include in web hook requests. + - `file_uploads_allowed_extensions` - Type: `list of str` - Default: `[".*"]` diff --git a/openhands/core/config/openhands_config.py b/openhands/core/config/openhands_config.py index d809135690..b5d8e60c2a 100644 --- a/openhands/core/config/openhands_config.py +++ b/openhands/core/config/openhands_config.py @@ -29,6 +29,8 @@ class OpenHandsConfig(BaseModel): runtime: Runtime environment identifier. file_store: Type of file store to use. file_store_path: Path to the file store. + file_store_web_hook_url: Optional url for file store web hook + file_store_web_hook_headers: Optional headers for file_store web hook save_trajectory_path: Either a folder path to store trajectories with auto-generated filenames, or a designated trajectory file path. save_screenshots_in_trajectory: Whether to save screenshots in trajectory (in encoded image format). replay_trajectory_path: Path to load trajectory and replay. If provided, trajectory would be replayed first before user's instruction. @@ -62,6 +64,8 @@ class OpenHandsConfig(BaseModel): runtime: str = Field(default='docker') file_store: str = Field(default='local') file_store_path: str = Field(default='/tmp/openhands_file_store') + file_store_web_hook_url: str | None = Field(default=None) + file_store_web_hook_headers: dict | None = Field(default=None) save_trajectory_path: str | None = Field(default=None) save_screenshots_in_trajectory: bool = Field(default=False) replay_trajectory_path: str | None = Field(default=None) diff --git a/openhands/server/shared.py b/openhands/server/shared.py index cc6167dfef..b62157983c 100644 --- a/openhands/server/shared.py +++ b/openhands/server/shared.py @@ -26,7 +26,12 @@ assert isinstance(server_config_interface, ServerConfig), ( 'Loaded server config interface is not a ServerConfig, despite this being assumed' ) server_config: ServerConfig = server_config_interface -file_store: FileStore = get_file_store(config.file_store, config.file_store_path) +file_store: FileStore = get_file_store( + config.file_store, + config.file_store_path, + config.file_store_web_hook_url, + config.file_store_web_hook_headers, +) client_manager = None redis_host = os.environ.get('REDIS_HOST') diff --git a/openhands/storage/README.md b/openhands/storage/README.md new file mode 100644 index 0000000000..1d5cf2566c --- /dev/null +++ b/openhands/storage/README.md @@ -0,0 +1,93 @@ +# OpenHands Storage Module + +The storage module provides different storage options for file operations in OpenHands, used for storing events, settings and other metadata. This module implements a common interface (`FileStore`) that allows for interchangeable storage backends. + +**Usage:** +```python + +store = ... + +# Write, read, list, and delete operations +store.write("example.txt", "Hello, world!") +content = store.read("example.txt") +files = store.list("/") +store.delete("example.txt") +``` + +## Available Storage Options + +### 1. Local File Storage (`local`) + +Local file storage saves files to the local filesystem. + +**Environment Variables:** +- None specific to this storage option +- Files are stored at the path specified by `file_store_path` in the configuration + +### 2. In-Memory Storage (`memory`) + +In-memory storage keeps files in memory, which is useful for testing or temporary storage. + +**Environment Variables:** +- None + +### 3. Amazon S3 Storage (`s3`) + +S3 storage uses Amazon S3 or compatible services for file storage. + +**Environment Variables:** +- The bucket name is specified by `file_store_path` in the configuration with a fallback to the `AWS_S3_BUCKET` enviroment variable. +- `AWS_ACCESS_KEY_ID`: Your AWS access key +- `AWS_SECRET_ACCESS_KEY`: Your AWS secret key +- `AWS_S3_ENDPOINT`: Optional custom endpoint for S3-compatible services (Allows overriding the default) +- `AWS_S3_SECURE`: Whether to use HTTPS (default: "true") + +### 4. Google Cloud Storage (`google_cloud`) + +Google Cloud Storage uses Google Cloud Storage buckets for file storage. + +**Environment Variables:** +- The bucket name is specified by `file_store_path` in the configuration with a fallback to the `GOOGLE_CLOUD_BUCKET_NAME` enviroment variable. +- `GOOGLE_APPLICATION_CREDENTIALS`: Path to Google Cloud credentials JSON file + +## Webhook Protocol + +The webhook protocol allows for integration with external systems by sending HTTP requests when files are written or deleted. + +### Overview + +The `WebHookFileStore` wraps another `FileStore` implementation and sends HTTP requests to a specified URL whenever files are written or deleted. This enables real-time notifications and synchronization with external systems. + +**Configuration Options:** +- `file_store_web_hook_url`: The base URL for webhook requests +- `file_store_web_hook_headers`: HTTP headers to include in webhook requests + +### Protocol Details + +1. **File Write Operation**: + - When a file is written, a POST request is sent to `{base_url}{path}` + - The request body contains the file contents + - The operation is retried up to 3 times with a 1-second delay between attempts + +2. **File Delete Operation**: + - When a file is deleted, a DELETE request is sent to `{base_url}{path}` + - The operation is retried up to 3 times with a 1-second delay between attempts + +## Configuration + +To configure the storage module in OpenHands, use the following configuration options: + +```toml +[core] +# File store type: "local", "memory", "s3", "google_cloud" +file_store = "local" + +# Path for local file store +file_store_path = "/tmp/file_store" + +# Optional webhook URL +file_store_web_hook_url = "https://example.com/api/files" + +# Optional webhook headers (JSON string) +file_store_web_hook_headers = '{"Authorization": "Bearer token"}' +``` diff --git a/openhands/storage/__init__.py b/openhands/storage/__init__.py index 570a494b36..262668ffd5 100644 --- a/openhands/storage/__init__.py +++ b/openhands/storage/__init__.py @@ -1,22 +1,43 @@ +import os + +import httpx + from openhands.storage.files import FileStore from openhands.storage.google_cloud import GoogleCloudFileStore -from openhands.storage.http import HTTPFileStore from openhands.storage.local import LocalFileStore from openhands.storage.memory import InMemoryFileStore from openhands.storage.s3 import S3FileStore +from openhands.storage.web_hook import WebHookFileStore -def get_file_store(file_store: str, file_store_path: str | None = None) -> FileStore: - if file_store == 'local': +def get_file_store( + file_store_type: str, + file_store_path: str | None = None, + file_store_web_hook_url: str | None = None, + file_store_web_hook_headers: dict | None = None, +) -> FileStore: + store: FileStore + if file_store_type == 'local': if file_store_path is None: raise ValueError('file_store_path is required for local file store') - return LocalFileStore(file_store_path) - elif file_store == 's3': - return S3FileStore(file_store_path) - elif file_store == 'google_cloud': - return GoogleCloudFileStore(file_store_path) - elif file_store == 'http': - if file_store_path is None: - raise ValueError('file_store_path is required for HTTP file store') - return HTTPFileStore(file_store_path) - return InMemoryFileStore() + store = LocalFileStore(file_store_path) + elif file_store_type == 's3': + store = S3FileStore(file_store_path) + elif file_store_type == 'google_cloud': + store = GoogleCloudFileStore(file_store_path) + else: + store = InMemoryFileStore() + if file_store_web_hook_url: + if file_store_web_hook_headers is None: + # Fallback to default headers. Use the session api key if it is defined in the env. + file_store_web_hook_headers = {} + if os.getenv('SESSION_API_KEY'): + file_store_web_hook_headers['X-Session-API-Key'] = os.getenv( + 'SESSION_API_KEY' + ) + store = WebHookFileStore( + store, + file_store_web_hook_url, + httpx.Client(headers=file_store_web_hook_headers or {}), + ) + return store diff --git a/openhands/storage/conversation/file_conversation_store.py b/openhands/storage/conversation/file_conversation_store.py index 78a5a664ea..03e0a62ec9 100644 --- a/openhands/storage/conversation/file_conversation_store.py +++ b/openhands/storage/conversation/file_conversation_store.py @@ -105,7 +105,7 @@ class FileConversationStore(ConversationStore): async def get_instance( cls, config: OpenHandsConfig, user_id: str | None ) -> FileConversationStore: - file_store = get_file_store(config.file_store, config.file_store_path) + file_store = get_file_store(config.file_store, config.file_store_path, config.file_store_web_hook_url, config.file_store_web_hook_headers) return FileConversationStore(file_store) diff --git a/openhands/storage/http.py b/openhands/storage/http.py deleted file mode 100644 index be5e342dbf..0000000000 --- a/openhands/storage/http.py +++ /dev/null @@ -1,206 +0,0 @@ -import json -import os -import urllib.parse -from typing import Union - -import httpx -from requests.exceptions import RequestException - -from openhands.core.logger import openhands_logger as logger -from openhands.storage.files import FileStore - - -class HTTPFileStore(FileStore): - """ - A FileStore implementation that uses HTTP requests to store and retrieve files. - - This implementation allows storing files on a remote HTTP server that implements - a simple REST API for file operations. - - The server should implement the following endpoints: - - POST /files/{path} - Write a file - - GET /files/{path} - Read a file - - OPTIONS /files/{path} - List files in a directory - - DELETE /files/{path} - Delete a file or directory - - Authentication can be provided by customizing the provided httpx client. - A (mock) server implementation is available in the MockHttpxClient class - located at /tests/unit/test_storage.py - """ - - base_url: str - client: httpx.Client - - def __init__( - self, - base_url: str, - client: httpx.Client | None = None, - ) -> None: - """ - Initialize the HTTP file store. - - Args: - base_url: The base URL of the HTTP file server - api_key: Optional API key for authentication - username: Optional username for basic authentication - password: Optional password for basic authentication - bearer_token: Optional bearer token for authentication - timeout: Request timeout in seconds - verify_ssl: Whether to verify SSL certificates - """ - self.base_url = base_url.rstrip('/') - if not client: - headers = {} - if os.getenv('SESSION_API_KEY'): - headers['X-Session-API-Key'] = os.getenv('SESSION_API_KEY') - client = httpx.Client(headers=headers) - self.client = client - - def _get_file_url(self, path: str) -> str: - """ - Get the full URL for a file path. - - Args: - path: The file path - - Returns: - The full URL - """ - # Ensure path starts with a slash - if not path.startswith('/'): - path = '/' + path - - # URL encode the path - encoded_path = urllib.parse.quote(path) - return f'{self.base_url}{encoded_path}' - - def write(self, path: str, contents: Union[str, bytes]) -> None: - """ - Write contents to a file. - - Args: - path: The file path - contents: The file contents (string or bytes) - - Raises: - FileNotFoundError: If the file cannot be written - """ - url = self._get_file_url(path) - - try: - # Convert string to bytes if needed - if isinstance(contents, str): - contents = contents.encode('utf-8') - - response = self.client.post(url, content=contents) - - if response.status_code not in (200, 201, 204): - raise FileNotFoundError( - f'Error: Failed to write to path {path}. ' - f'Status code: {response.status_code}, Response: {response.text}' - ) - - logger.debug(f'Successfully wrote to {path}') - - except RequestException as e: - raise FileNotFoundError(f'Error: Failed to write to path {path}: {str(e)}') - - def read(self, path: str) -> str: - """ - Read contents from a file. - - Args: - path: The file path - - Returns: - The file contents as a string - - Raises: - FileNotFoundError: If the file cannot be read - """ - url = self._get_file_url(path) - - try: - response = self.client.get(url) - - if response.status_code != 200: - raise FileNotFoundError( - f'Error: Failed to read from path {path}. ' - f'Status code: {response.status_code}, Response: {response.text}' - ) - - return response.text - - except RequestException as e: - raise FileNotFoundError(f'Error: Failed to read from path {path}: {str(e)}') - - def list(self, path: str) -> list[str]: - """ - List files in a directory. - - Args: - path: The directory path - - Returns: - A list of file paths - - Raises: - FileNotFoundError: If the directory cannot be listed - """ - url = f'{self._get_file_url(path)}' - - try: - response = self.client.options(url) - - if response.status_code != 200: - if response.status_code == 404: - return [] - - raise FileNotFoundError( - f'Error: Failed to list path {path}. ' - f'Status code: {response.status_code}, Response: {response.text}' - ) - - try: - files = response.json() - if not isinstance(files, list): - raise FileNotFoundError( - f'Error: Invalid response format when listing path {path}. ' - f'Expected a list, got: {type(files)}' - ) - return files - except json.JSONDecodeError: - raise FileNotFoundError( - f'Error: Invalid JSON response when listing path {path}. ' - f'Response: {response.text}' - ) - - except RequestException as e: - raise FileNotFoundError(f'Error: Failed to list path {path}: {str(e)}') - - def delete(self, path: str) -> None: - """ - Delete a file or directory. - - Args: - path: The file or directory path - - Raises: - FileNotFoundError: If the file or directory cannot be deleted - """ - url = self._get_file_url(path) - - try: - response = self.client.delete(url) - - # 404 is acceptable for delete operations - if response.status_code not in (200, 202, 204, 404): - raise FileNotFoundError( - f'Error: Failed to delete path {path}. ' - f'Status code: {response.status_code}, Response: {response.text}' - ) - - logger.debug(f'Successfully deleted {path}') - - except RequestException as e: - raise FileNotFoundError(f'Error: Failed to delete path {path}: {str(e)}') diff --git a/openhands/storage/secrets/file_secrets_store.py b/openhands/storage/secrets/file_secrets_store.py index 8b4368b432..05268a97c3 100644 --- a/openhands/storage/secrets/file_secrets_store.py +++ b/openhands/storage/secrets/file_secrets_store.py @@ -39,5 +39,10 @@ class FileSecretsStore(SecretsStore): async def get_instance( cls, config: OpenHandsConfig, user_id: str | None ) -> FileSecretsStore: - file_store = get_file_store(config.file_store, config.file_store_path) + file_store = file_store = get_file_store( + config.file_store, + config.file_store_path, + config.file_store_web_hook_url, + config.file_store_web_hook_headers + ) return FileSecretsStore(file_store) diff --git a/openhands/storage/settings/file_settings_store.py b/openhands/storage/settings/file_settings_store.py index 8095d25e7d..97b288dbfc 100644 --- a/openhands/storage/settings/file_settings_store.py +++ b/openhands/storage/settings/file_settings_store.py @@ -33,5 +33,10 @@ class FileSettingsStore(SettingsStore): async def get_instance( cls, config: OpenHandsConfig, user_id: str | None ) -> FileSettingsStore: - file_store = get_file_store(config.file_store, config.file_store_path) + file_store = file_store = get_file_store( + config.file_store, + config.file_store_path, + config.file_store_web_hook_url, + config.file_store_web_hook_headers + ) return FileSettingsStore(file_store) diff --git a/openhands/storage/web_hook.py b/openhands/storage/web_hook.py new file mode 100644 index 0000000000..b52f206bba --- /dev/null +++ b/openhands/storage/web_hook.py @@ -0,0 +1,126 @@ +import httpx +import tenacity + +from openhands.storage.files import FileStore +from openhands.utils.async_utils import EXECUTOR + + +class WebHookFileStore(FileStore): + """ + File store which includes a web hook to be invoked after any changes occur. + + This class wraps another FileStore implementation and sends HTTP requests + to a specified URL whenever files are written or deleted. + + Attributes: + file_store: The underlying FileStore implementation + base_url: The base URL for webhook requests + client: The HTTP client used to make webhook requests + """ + + file_store: FileStore + base_url: str + client: httpx.Client + + def __init__( + self, file_store: FileStore, base_url: str, client: httpx.Client | None = None + ): + """ + Initialize a WebHookFileStore. + + Args: + file_store: The underlying FileStore implementation + base_url: The base URL for webhook requests + client: Optional HTTP client to use for requests. If None, a new client will be created. + """ + self.file_store = file_store + self.base_url = base_url + if client is None: + client = httpx.Client() + self.client = client + + def write(self, path: str, contents: str | bytes) -> None: + """ + Write contents to a file and trigger a webhook. + + Args: + path: The path to write to + contents: The contents to write + """ + self.file_store.write(path, contents) + EXECUTOR.submit(self._on_write, path, contents) + + def read(self, path: str) -> str: + """ + Read contents from a file. + + Args: + path: The path to read from + + Returns: + The contents of the file + """ + return self.file_store.read(path) + + def list(self, path: str) -> list[str]: + """ + List files in a directory. + + Args: + path: The directory path to list + + Returns: + A list of file paths + """ + return self.file_store.list(path) + + def delete(self, path: str) -> None: + """ + Delete a file and trigger a webhook. + + Args: + path: The path to delete + """ + self.file_store.delete(path) + EXECUTOR.submit(self._on_delete, path) + + @tenacity.retry( + wait=tenacity.wait_fixed(1), + stop=tenacity.stop_after_attempt(3), + ) + def _on_write(self, path: str, contents: str | bytes) -> None: + """ + Send a POST request to the webhook URL when a file is written. + + This method is retried up to 3 times with a 1-second delay between attempts. + + Args: + path: The path that was written to + contents: The contents that were written + + Raises: + httpx.HTTPStatusError: If the webhook request fails + """ + base_url = self.base_url + path + response = self.client.post(base_url, content=contents) + response.raise_for_status() + + @tenacity.retry( + wait=tenacity.wait_fixed(1), + stop=tenacity.stop_after_attempt(3), + ) + def _on_delete(self, path: str) -> None: + """ + Send a DELETE request to the webhook URL when a file is deleted. + + This method is retried up to 3 times with a 1-second delay between attempts. + + Args: + path: The path that was deleted + + Raises: + httpx.HTTPStatusError: If the webhook request fails + """ + base_url = self.base_url + path + response = self.client.delete(base_url) + response.raise_for_status() diff --git a/tests/runtime/conftest.py b/tests/runtime/conftest.py index 40637b6406..fdd8bb0336 100644 --- a/tests/runtime/conftest.py +++ b/tests/runtime/conftest.py @@ -263,7 +263,12 @@ def _load_runtime( if override_mcp_config is not None: config.mcp = override_mcp_config - file_store = get_file_store(config.file_store, config.file_store_path) + file_store = file_store = get_file_store( + config.file_store, + config.file_store_path, + config.file_store_web_hook_url, + config.file_store_web_hook_headers, + ) event_stream = EventStream(sid, file_store) runtime = runtime_cls( diff --git a/tests/unit/test_file_settings_store.py b/tests/unit/test_file_settings_store.py index 94c6f70277..41b2511c8d 100644 --- a/tests/unit/test_file_settings_store.py +++ b/tests/unit/test_file_settings_store.py @@ -86,4 +86,4 @@ async def test_get_instance(): assert isinstance(store, FileSettingsStore) assert store.file_store == mock_store - mock_get_store.assert_called_once_with('local', '/test/path') + mock_get_store.assert_called_once_with('local', '/test/path', None, None) diff --git a/tests/unit/test_storage.py b/tests/unit/test_storage.py index e52f346920..a78c12df98 100644 --- a/tests/unit/test_storage.py +++ b/tests/unit/test_storage.py @@ -11,11 +11,9 @@ from unittest.mock import patch import botocore.exceptions from google.api_core.exceptions import NotFound -from httpx import Response from openhands.storage.files import FileStore from openhands.storage.google_cloud import GoogleCloudFileStore -from openhands.storage.http import HTTPFileStore from openhands.storage.local import LocalFileStore from openhands.storage.memory import InMemoryFileStore from openhands.storage.s3 import S3FileStore @@ -142,11 +140,6 @@ class TestS3FileStore(TestCase, _StorageTest): self.store = S3FileStore('dear-liza') -class TestHTTPFileStore(TestCase, _StorageTest): - def setUp(self): - self.store = HTTPFileStore('http://foo.com', MockHttpxClient('http://foo.com/')) - - # I would have liked to use cloud-storage-mocker here but the python versions were incompatible :( # If we write tests for the S3 storage class I would definitely recommend we use moto. class _MockGoogleCloudClient: @@ -281,37 +274,3 @@ class _MockS3Client: class _MockS3Object: key: str content: str | bytes - - -@dataclass -class MockHttpxClient: - base_url: str - file_store: FileStore = field(default_factory=InMemoryFileStore) - - def options(self, url: str): - path = self._get_path(url) - files = self.file_store.list(path) - return Response(200, json=files) - - def delete(self, url: str): - path = self._get_path(url) - self.file_store.delete(path) - return Response(200) - - def post(self, url: str, content: str | bytes): - path = self._get_path(url) - self.file_store.write(path, content) - return Response(200) - - def get(self, url: str): - path = self._get_path(url) - try: - content = self.file_store.read(path) - return Response(200, content=content) - except FileNotFoundError: - return Response(404) - - def _get_path(self, url: str): - assert url.startswith(self.base_url) - path = url[len(self.base_url) :] - return path