Add AwsSharedEventService for shared conversations (#13141)

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
chuckbutkus
2026-03-13 14:32:58 -04:00
committed by GitHub
parent 0527c46bba
commit 922e3a2431
10 changed files with 1529 additions and 8 deletions

View File

@@ -56,6 +56,7 @@ from openhands.app_server.web_client.web_client_config_injector import (
)
from openhands.sdk.utils.models import OpenHandsModel
from openhands.server.types import AppMode
from openhands.utils.environment import StorageProvider, get_storage_provider
def get_default_persistence_dir() -> Path:
@@ -140,6 +141,9 @@ def config_from_env() -> AppServerConfig:
from openhands.app_server.app_conversation.sql_app_conversation_start_task_service import ( # noqa: E501
SQLAppConversationStartTaskServiceInjector,
)
from openhands.app_server.event.aws_event_service import (
AwsEventServiceInjector,
)
from openhands.app_server.event.filesystem_event_service import (
FilesystemEventServiceInjector,
)
@@ -174,8 +178,18 @@ def config_from_env() -> AppServerConfig:
config: AppServerConfig = from_env(AppServerConfig, 'OH') # type: ignore
if config.event is None:
if os.environ.get('FILE_STORE') == 'google_cloud':
# Legacy V0 google cloud storage configuration
provider = get_storage_provider()
if provider == StorageProvider.AWS:
# AWS S3 storage configuration
bucket_name = os.environ.get('FILE_STORE_PATH')
if not bucket_name:
raise ValueError(
'FILE_STORE_PATH environment variable is required for S3 storage'
)
config.event = AwsEventServiceInjector(bucket_name=bucket_name)
elif provider == StorageProvider.GCP:
# Google Cloud storage configuration
config.event = GoogleCloudEventServiceInjector(
bucket_name=os.environ.get('FILE_STORE_PATH')
)

View File

@@ -0,0 +1,113 @@
"""AWS S3-based EventService implementation.
This implementation uses role-based authentication (no credentials needed).
"""
import json
import logging
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Any, AsyncGenerator
import boto3
import botocore.exceptions
from fastapi import Request
from openhands.app_server.config import get_app_conversation_info_service
from openhands.app_server.event.event_service import EventService, EventServiceInjector
from openhands.app_server.event.event_service_base import EventServiceBase
from openhands.app_server.services.injector import InjectorState
from openhands.sdk import Event
_logger = logging.getLogger(__name__)
@dataclass
class AwsEventService(EventServiceBase):
"""AWS S3-based implementation of EventService.
Uses role-based authentication, so no explicit credentials are needed.
"""
s3_client: Any
bucket_name: str
def _load_event(self, path: Path) -> Event | None:
"""Get the event at the path given."""
try:
response = self.s3_client.get_object(Bucket=self.bucket_name, Key=str(path))
with response['Body'] as stream:
json_data = stream.read().decode('utf-8')
event = Event.model_validate_json(json_data)
return event
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
return None
_logger.exception(f'Error reading event from {path}')
return None
except Exception:
_logger.exception(f'Error reading event from {path}')
return None
def _store_event(self, path: Path, event: Event):
"""Store the event given at the path given."""
data = event.model_dump(mode='json')
json_str = json.dumps(data, indent=2)
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=str(path),
Body=json_str.encode('utf-8'),
)
def _search_paths(self, prefix: Path, page_id: str | None = None) -> list[Path]:
"""Search paths."""
kwargs: dict[str, Any] = {
'Bucket': self.bucket_name,
'Prefix': str(prefix),
}
if page_id:
kwargs['ContinuationToken'] = page_id
response = self.s3_client.list_objects_v2(**kwargs)
contents = response.get('Contents', [])
paths = [Path(obj['Key']) for obj in contents]
return paths
class AwsEventServiceInjector(EventServiceInjector):
bucket_name: str
prefix: Path = Path('users')
async def inject(
self, state: InjectorState, request: Request | None = None
) -> AsyncGenerator[EventService, None]:
from openhands.app_server.config import (
get_user_context,
)
async with (
get_user_context(state, request) as user_context,
get_app_conversation_info_service(
state, request
) as app_conversation_info_service,
):
user_id = await user_context.get_user_id()
bucket_name = self.bucket_name
# Use role-based authentication - boto3 will automatically
# use IAM role credentials when running in AWS
s3_client = boto3.client(
's3',
endpoint_url=os.getenv('AWS_S3_ENDPOINT'),
)
yield AwsEventService(
prefix=self.prefix,
user_id=user_id,
app_conversation_info_service=app_conversation_info_service,
s3_client=s3_client,
bucket_name=bucket_name,
app_conversation_info_load_tasks={},
)

View File

@@ -1,6 +1,7 @@
from __future__ import annotations
import os
from enum import Enum
from functools import lru_cache
from pathlib import Path
@@ -9,6 +10,38 @@ _LEMONADE_PROVIDER_NAME = 'lemonade'
_LEMONADE_MODEL_PREFIX = 'lemonade/'
class StorageProvider(str, Enum):
"""Storage provider types for event and shared event storage."""
AWS = 'aws'
GCP = 'gcp'
FILESYSTEM = 'filesystem'
def get_storage_provider() -> StorageProvider:
"""Get the storage provider based on environment variables.
Determines the storage provider from environment configuration:
- SHARED_EVENT_STORAGE_PROVIDER: Primary setting, supports 'aws', 'gcp', 'google_cloud'
- FILE_STORE: Legacy fallback, supports 'google_cloud'
Returns:
StorageProvider: The configured storage provider (AWS, GCP, or FILESYSTEM)
"""
provider = os.environ.get('SHARED_EVENT_STORAGE_PROVIDER', '').lower()
# If not explicitly set, fall back to FILE_STORE
if not provider:
provider = os.environ.get('FILE_STORE', '').lower()
if provider == 'aws':
return StorageProvider.AWS
elif provider in ('gcp', 'google_cloud'):
return StorageProvider.GCP
else:
return StorageProvider.FILESYSTEM
@lru_cache(maxsize=1)
def is_running_in_docker() -> bool:
"""Best-effort detection for Docker containers."""