Added Webhooks to event store (#8763)

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
tofarr
2025-05-29 07:39:08 -06:00
committed by GitHub
parent cb0a1c91e4
commit cc881a6bcb
13 changed files with 294 additions and 267 deletions

View File

@@ -66,9 +66,19 @@ The core configuration options are defined in the `[core]` section of the `confi
- `file_store`
- Type: `str`
- Default: `"memory"`
- Default: `"local"`
- Description: File store type
- `file_store_web_hook_url`
- Type: `str`
- Default: `None`
- Description: Optional url for a webhook to invoke after file store writes / deletes
- `file_store_web_hook_headers`
- Type: `str`
- Default: `None`
- Description: HTTP Headers to include in web hook requests.
- `file_uploads_allowed_extensions`
- Type: `list of str`
- Default: `[".*"]`

View File

@@ -29,6 +29,8 @@ class OpenHandsConfig(BaseModel):
runtime: Runtime environment identifier.
file_store: Type of file store to use.
file_store_path: Path to the file store.
file_store_web_hook_url: Optional url for file store web hook
file_store_web_hook_headers: Optional headers for file_store web hook
save_trajectory_path: Either a folder path to store trajectories with auto-generated filenames, or a designated trajectory file path.
save_screenshots_in_trajectory: Whether to save screenshots in trajectory (in encoded image format).
replay_trajectory_path: Path to load trajectory and replay. If provided, trajectory would be replayed first before user's instruction.
@@ -62,6 +64,8 @@ class OpenHandsConfig(BaseModel):
runtime: str = Field(default='docker')
file_store: str = Field(default='local')
file_store_path: str = Field(default='/tmp/openhands_file_store')
file_store_web_hook_url: str | None = Field(default=None)
file_store_web_hook_headers: dict | None = Field(default=None)
save_trajectory_path: str | None = Field(default=None)
save_screenshots_in_trajectory: bool = Field(default=False)
replay_trajectory_path: str | None = Field(default=None)

View File

@@ -26,7 +26,12 @@ assert isinstance(server_config_interface, ServerConfig), (
'Loaded server config interface is not a ServerConfig, despite this being assumed'
)
server_config: ServerConfig = server_config_interface
file_store: FileStore = get_file_store(config.file_store, config.file_store_path)
file_store: FileStore = get_file_store(
config.file_store,
config.file_store_path,
config.file_store_web_hook_url,
config.file_store_web_hook_headers,
)
client_manager = None
redis_host = os.environ.get('REDIS_HOST')

View File

@@ -0,0 +1,93 @@
# OpenHands Storage Module
The storage module provides different storage options for file operations in OpenHands, used for storing events, settings and other metadata. This module implements a common interface (`FileStore`) that allows for interchangeable storage backends.
**Usage:**
```python
store = ...
# Write, read, list, and delete operations
store.write("example.txt", "Hello, world!")
content = store.read("example.txt")
files = store.list("/")
store.delete("example.txt")
```
## Available Storage Options
### 1. Local File Storage (`local`)
Local file storage saves files to the local filesystem.
**Environment Variables:**
- None specific to this storage option
- Files are stored at the path specified by `file_store_path` in the configuration
### 2. In-Memory Storage (`memory`)
In-memory storage keeps files in memory, which is useful for testing or temporary storage.
**Environment Variables:**
- None
### 3. Amazon S3 Storage (`s3`)
S3 storage uses Amazon S3 or compatible services for file storage.
**Environment Variables:**
- The bucket name is specified by `file_store_path` in the configuration with a fallback to the `AWS_S3_BUCKET` enviroment variable.
- `AWS_ACCESS_KEY_ID`: Your AWS access key
- `AWS_SECRET_ACCESS_KEY`: Your AWS secret key
- `AWS_S3_ENDPOINT`: Optional custom endpoint for S3-compatible services (Allows overriding the default)
- `AWS_S3_SECURE`: Whether to use HTTPS (default: "true")
### 4. Google Cloud Storage (`google_cloud`)
Google Cloud Storage uses Google Cloud Storage buckets for file storage.
**Environment Variables:**
- The bucket name is specified by `file_store_path` in the configuration with a fallback to the `GOOGLE_CLOUD_BUCKET_NAME` enviroment variable.
- `GOOGLE_APPLICATION_CREDENTIALS`: Path to Google Cloud credentials JSON file
## Webhook Protocol
The webhook protocol allows for integration with external systems by sending HTTP requests when files are written or deleted.
### Overview
The `WebHookFileStore` wraps another `FileStore` implementation and sends HTTP requests to a specified URL whenever files are written or deleted. This enables real-time notifications and synchronization with external systems.
**Configuration Options:**
- `file_store_web_hook_url`: The base URL for webhook requests
- `file_store_web_hook_headers`: HTTP headers to include in webhook requests
### Protocol Details
1. **File Write Operation**:
- When a file is written, a POST request is sent to `{base_url}{path}`
- The request body contains the file contents
- The operation is retried up to 3 times with a 1-second delay between attempts
2. **File Delete Operation**:
- When a file is deleted, a DELETE request is sent to `{base_url}{path}`
- The operation is retried up to 3 times with a 1-second delay between attempts
## Configuration
To configure the storage module in OpenHands, use the following configuration options:
```toml
[core]
# File store type: "local", "memory", "s3", "google_cloud"
file_store = "local"
# Path for local file store
file_store_path = "/tmp/file_store"
# Optional webhook URL
file_store_web_hook_url = "https://example.com/api/files"
# Optional webhook headers (JSON string)
file_store_web_hook_headers = '{"Authorization": "Bearer token"}'
```

View File

@@ -1,22 +1,43 @@
import os
import httpx
from openhands.storage.files import FileStore
from openhands.storage.google_cloud import GoogleCloudFileStore
from openhands.storage.http import HTTPFileStore
from openhands.storage.local import LocalFileStore
from openhands.storage.memory import InMemoryFileStore
from openhands.storage.s3 import S3FileStore
from openhands.storage.web_hook import WebHookFileStore
def get_file_store(file_store: str, file_store_path: str | None = None) -> FileStore:
if file_store == 'local':
def get_file_store(
file_store_type: str,
file_store_path: str | None = None,
file_store_web_hook_url: str | None = None,
file_store_web_hook_headers: dict | None = None,
) -> FileStore:
store: FileStore
if file_store_type == 'local':
if file_store_path is None:
raise ValueError('file_store_path is required for local file store')
return LocalFileStore(file_store_path)
elif file_store == 's3':
return S3FileStore(file_store_path)
elif file_store == 'google_cloud':
return GoogleCloudFileStore(file_store_path)
elif file_store == 'http':
if file_store_path is None:
raise ValueError('file_store_path is required for HTTP file store')
return HTTPFileStore(file_store_path)
return InMemoryFileStore()
store = LocalFileStore(file_store_path)
elif file_store_type == 's3':
store = S3FileStore(file_store_path)
elif file_store_type == 'google_cloud':
store = GoogleCloudFileStore(file_store_path)
else:
store = InMemoryFileStore()
if file_store_web_hook_url:
if file_store_web_hook_headers is None:
# Fallback to default headers. Use the session api key if it is defined in the env.
file_store_web_hook_headers = {}
if os.getenv('SESSION_API_KEY'):
file_store_web_hook_headers['X-Session-API-Key'] = os.getenv(
'SESSION_API_KEY'
)
store = WebHookFileStore(
store,
file_store_web_hook_url,
httpx.Client(headers=file_store_web_hook_headers or {}),
)
return store

View File

@@ -105,7 +105,7 @@ class FileConversationStore(ConversationStore):
async def get_instance(
cls, config: OpenHandsConfig, user_id: str | None
) -> FileConversationStore:
file_store = get_file_store(config.file_store, config.file_store_path)
file_store = get_file_store(config.file_store, config.file_store_path, config.file_store_web_hook_url, config.file_store_web_hook_headers)
return FileConversationStore(file_store)

View File

@@ -1,206 +0,0 @@
import json
import os
import urllib.parse
from typing import Union
import httpx
from requests.exceptions import RequestException
from openhands.core.logger import openhands_logger as logger
from openhands.storage.files import FileStore
class HTTPFileStore(FileStore):
"""
A FileStore implementation that uses HTTP requests to store and retrieve files.
This implementation allows storing files on a remote HTTP server that implements
a simple REST API for file operations.
The server should implement the following endpoints:
- POST /files/{path} - Write a file
- GET /files/{path} - Read a file
- OPTIONS /files/{path} - List files in a directory
- DELETE /files/{path} - Delete a file or directory
Authentication can be provided by customizing the provided httpx client.
A (mock) server implementation is available in the MockHttpxClient class
located at /tests/unit/test_storage.py
"""
base_url: str
client: httpx.Client
def __init__(
self,
base_url: str,
client: httpx.Client | None = None,
) -> None:
"""
Initialize the HTTP file store.
Args:
base_url: The base URL of the HTTP file server
api_key: Optional API key for authentication
username: Optional username for basic authentication
password: Optional password for basic authentication
bearer_token: Optional bearer token for authentication
timeout: Request timeout in seconds
verify_ssl: Whether to verify SSL certificates
"""
self.base_url = base_url.rstrip('/')
if not client:
headers = {}
if os.getenv('SESSION_API_KEY'):
headers['X-Session-API-Key'] = os.getenv('SESSION_API_KEY')
client = httpx.Client(headers=headers)
self.client = client
def _get_file_url(self, path: str) -> str:
"""
Get the full URL for a file path.
Args:
path: The file path
Returns:
The full URL
"""
# Ensure path starts with a slash
if not path.startswith('/'):
path = '/' + path
# URL encode the path
encoded_path = urllib.parse.quote(path)
return f'{self.base_url}{encoded_path}'
def write(self, path: str, contents: Union[str, bytes]) -> None:
"""
Write contents to a file.
Args:
path: The file path
contents: The file contents (string or bytes)
Raises:
FileNotFoundError: If the file cannot be written
"""
url = self._get_file_url(path)
try:
# Convert string to bytes if needed
if isinstance(contents, str):
contents = contents.encode('utf-8')
response = self.client.post(url, content=contents)
if response.status_code not in (200, 201, 204):
raise FileNotFoundError(
f'Error: Failed to write to path {path}. '
f'Status code: {response.status_code}, Response: {response.text}'
)
logger.debug(f'Successfully wrote to {path}')
except RequestException as e:
raise FileNotFoundError(f'Error: Failed to write to path {path}: {str(e)}')
def read(self, path: str) -> str:
"""
Read contents from a file.
Args:
path: The file path
Returns:
The file contents as a string
Raises:
FileNotFoundError: If the file cannot be read
"""
url = self._get_file_url(path)
try:
response = self.client.get(url)
if response.status_code != 200:
raise FileNotFoundError(
f'Error: Failed to read from path {path}. '
f'Status code: {response.status_code}, Response: {response.text}'
)
return response.text
except RequestException as e:
raise FileNotFoundError(f'Error: Failed to read from path {path}: {str(e)}')
def list(self, path: str) -> list[str]:
"""
List files in a directory.
Args:
path: The directory path
Returns:
A list of file paths
Raises:
FileNotFoundError: If the directory cannot be listed
"""
url = f'{self._get_file_url(path)}'
try:
response = self.client.options(url)
if response.status_code != 200:
if response.status_code == 404:
return []
raise FileNotFoundError(
f'Error: Failed to list path {path}. '
f'Status code: {response.status_code}, Response: {response.text}'
)
try:
files = response.json()
if not isinstance(files, list):
raise FileNotFoundError(
f'Error: Invalid response format when listing path {path}. '
f'Expected a list, got: {type(files)}'
)
return files
except json.JSONDecodeError:
raise FileNotFoundError(
f'Error: Invalid JSON response when listing path {path}. '
f'Response: {response.text}'
)
except RequestException as e:
raise FileNotFoundError(f'Error: Failed to list path {path}: {str(e)}')
def delete(self, path: str) -> None:
"""
Delete a file or directory.
Args:
path: The file or directory path
Raises:
FileNotFoundError: If the file or directory cannot be deleted
"""
url = self._get_file_url(path)
try:
response = self.client.delete(url)
# 404 is acceptable for delete operations
if response.status_code not in (200, 202, 204, 404):
raise FileNotFoundError(
f'Error: Failed to delete path {path}. '
f'Status code: {response.status_code}, Response: {response.text}'
)
logger.debug(f'Successfully deleted {path}')
except RequestException as e:
raise FileNotFoundError(f'Error: Failed to delete path {path}: {str(e)}')

View File

@@ -39,5 +39,10 @@ class FileSecretsStore(SecretsStore):
async def get_instance(
cls, config: OpenHandsConfig, user_id: str | None
) -> FileSecretsStore:
file_store = get_file_store(config.file_store, config.file_store_path)
file_store = file_store = get_file_store(
config.file_store,
config.file_store_path,
config.file_store_web_hook_url,
config.file_store_web_hook_headers
)
return FileSecretsStore(file_store)

View File

@@ -33,5 +33,10 @@ class FileSettingsStore(SettingsStore):
async def get_instance(
cls, config: OpenHandsConfig, user_id: str | None
) -> FileSettingsStore:
file_store = get_file_store(config.file_store, config.file_store_path)
file_store = file_store = get_file_store(
config.file_store,
config.file_store_path,
config.file_store_web_hook_url,
config.file_store_web_hook_headers
)
return FileSettingsStore(file_store)

View File

@@ -0,0 +1,126 @@
import httpx
import tenacity
from openhands.storage.files import FileStore
from openhands.utils.async_utils import EXECUTOR
class WebHookFileStore(FileStore):
"""
File store which includes a web hook to be invoked after any changes occur.
This class wraps another FileStore implementation and sends HTTP requests
to a specified URL whenever files are written or deleted.
Attributes:
file_store: The underlying FileStore implementation
base_url: The base URL for webhook requests
client: The HTTP client used to make webhook requests
"""
file_store: FileStore
base_url: str
client: httpx.Client
def __init__(
self, file_store: FileStore, base_url: str, client: httpx.Client | None = None
):
"""
Initialize a WebHookFileStore.
Args:
file_store: The underlying FileStore implementation
base_url: The base URL for webhook requests
client: Optional HTTP client to use for requests. If None, a new client will be created.
"""
self.file_store = file_store
self.base_url = base_url
if client is None:
client = httpx.Client()
self.client = client
def write(self, path: str, contents: str | bytes) -> None:
"""
Write contents to a file and trigger a webhook.
Args:
path: The path to write to
contents: The contents to write
"""
self.file_store.write(path, contents)
EXECUTOR.submit(self._on_write, path, contents)
def read(self, path: str) -> str:
"""
Read contents from a file.
Args:
path: The path to read from
Returns:
The contents of the file
"""
return self.file_store.read(path)
def list(self, path: str) -> list[str]:
"""
List files in a directory.
Args:
path: The directory path to list
Returns:
A list of file paths
"""
return self.file_store.list(path)
def delete(self, path: str) -> None:
"""
Delete a file and trigger a webhook.
Args:
path: The path to delete
"""
self.file_store.delete(path)
EXECUTOR.submit(self._on_delete, path)
@tenacity.retry(
wait=tenacity.wait_fixed(1),
stop=tenacity.stop_after_attempt(3),
)
def _on_write(self, path: str, contents: str | bytes) -> None:
"""
Send a POST request to the webhook URL when a file is written.
This method is retried up to 3 times with a 1-second delay between attempts.
Args:
path: The path that was written to
contents: The contents that were written
Raises:
httpx.HTTPStatusError: If the webhook request fails
"""
base_url = self.base_url + path
response = self.client.post(base_url, content=contents)
response.raise_for_status()
@tenacity.retry(
wait=tenacity.wait_fixed(1),
stop=tenacity.stop_after_attempt(3),
)
def _on_delete(self, path: str) -> None:
"""
Send a DELETE request to the webhook URL when a file is deleted.
This method is retried up to 3 times with a 1-second delay between attempts.
Args:
path: The path that was deleted
Raises:
httpx.HTTPStatusError: If the webhook request fails
"""
base_url = self.base_url + path
response = self.client.delete(base_url)
response.raise_for_status()

View File

@@ -263,7 +263,12 @@ def _load_runtime(
if override_mcp_config is not None:
config.mcp = override_mcp_config
file_store = get_file_store(config.file_store, config.file_store_path)
file_store = file_store = get_file_store(
config.file_store,
config.file_store_path,
config.file_store_web_hook_url,
config.file_store_web_hook_headers,
)
event_stream = EventStream(sid, file_store)
runtime = runtime_cls(

View File

@@ -86,4 +86,4 @@ async def test_get_instance():
assert isinstance(store, FileSettingsStore)
assert store.file_store == mock_store
mock_get_store.assert_called_once_with('local', '/test/path')
mock_get_store.assert_called_once_with('local', '/test/path', None, None)

View File

@@ -11,11 +11,9 @@ from unittest.mock import patch
import botocore.exceptions
from google.api_core.exceptions import NotFound
from httpx import Response
from openhands.storage.files import FileStore
from openhands.storage.google_cloud import GoogleCloudFileStore
from openhands.storage.http import HTTPFileStore
from openhands.storage.local import LocalFileStore
from openhands.storage.memory import InMemoryFileStore
from openhands.storage.s3 import S3FileStore
@@ -142,11 +140,6 @@ class TestS3FileStore(TestCase, _StorageTest):
self.store = S3FileStore('dear-liza')
class TestHTTPFileStore(TestCase, _StorageTest):
def setUp(self):
self.store = HTTPFileStore('http://foo.com', MockHttpxClient('http://foo.com/'))
# I would have liked to use cloud-storage-mocker here but the python versions were incompatible :(
# If we write tests for the S3 storage class I would definitely recommend we use moto.
class _MockGoogleCloudClient:
@@ -281,37 +274,3 @@ class _MockS3Client:
class _MockS3Object:
key: str
content: str | bytes
@dataclass
class MockHttpxClient:
base_url: str
file_store: FileStore = field(default_factory=InMemoryFileStore)
def options(self, url: str):
path = self._get_path(url)
files = self.file_store.list(path)
return Response(200, json=files)
def delete(self, url: str):
path = self._get_path(url)
self.file_store.delete(path)
return Response(200)
def post(self, url: str, content: str | bytes):
path = self._get_path(url)
self.file_store.write(path, content)
return Response(200)
def get(self, url: str):
path = self._get_path(url)
try:
content = self.file_store.read(path)
return Response(200, content=content)
except FileNotFoundError:
return Response(404)
def _get_path(self, url: str):
assert url.startswith(self.base_url)
path = url[len(self.base_url) :]
return path