mirror of
https://github.com/OpenHands/OpenHands.git
synced 2025-12-26 05:48:36 +08:00
Co-authored-by: mamoodi <mamoodiha@gmail.com> Co-authored-by: Engel Nyst <engel.nyst@gmail.com>
49 lines
1.7 KiB
Python
49 lines
1.7 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from dataclasses import dataclass
|
|
|
|
from openhands.core.config.openhands_config import OpenHandsConfig
|
|
from openhands.storage import get_file_store
|
|
from openhands.storage.data_models.settings import Settings
|
|
from openhands.storage.files import FileStore
|
|
from openhands.storage.settings.settings_store import SettingsStore
|
|
from openhands.utils.async_utils import call_sync_from_async
|
|
|
|
|
|
@dataclass
|
|
class FileSettingsStore(SettingsStore):
|
|
file_store: FileStore
|
|
path: str = 'settings.json'
|
|
|
|
async def load(self) -> Settings | None:
|
|
try:
|
|
json_str = await call_sync_from_async(self.file_store.read, self.path)
|
|
kwargs = json.loads(json_str)
|
|
settings = Settings(**kwargs)
|
|
|
|
# Turn on V1 in OpenHands
|
|
# We can simplify / remove this as part of V0 removal
|
|
settings.v1_enabled = True
|
|
|
|
return settings
|
|
except FileNotFoundError:
|
|
return None
|
|
|
|
async def store(self, settings: Settings) -> None:
|
|
json_str = settings.model_dump_json(context={'expose_secrets': True})
|
|
await call_sync_from_async(self.file_store.write, self.path, json_str)
|
|
|
|
@classmethod
|
|
async def get_instance(
|
|
cls, config: OpenHandsConfig, user_id: str | None
|
|
) -> FileSettingsStore:
|
|
file_store = get_file_store(
|
|
file_store_type=config.file_store,
|
|
file_store_path=config.file_store_path,
|
|
file_store_web_hook_url=config.file_store_web_hook_url,
|
|
file_store_web_hook_headers=config.file_store_web_hook_headers,
|
|
file_store_web_hook_batch=config.file_store_web_hook_batch,
|
|
)
|
|
return FileSettingsStore(file_store)
|