mirror of
https://github.com/OpenHands/OpenHands.git
synced 2025-12-26 05:48:36 +08:00
* add to event stream sync * remove async from tests * small logging spam fix * remove swe agent * arch refactoring: use history from the event stream * refactor agents * monologue agent * ruff * planner agent * micro-agents * refactor history in evaluations * evals history refactoring * adapt evals and tests * unit testing stuck * testing micro agents, event stream * fix planner agent * fix tests * fix stuck after rename * fix test * small clean up * fix merge * fix merge issue * fix integration tests * Update agenthub/dummy_agent/agent.py * fix tests * rename more clearly; add todo; clean up
49 lines
1.4 KiB
Python
49 lines
1.4 KiB
Python
import os
|
|
|
|
from opendevin.core.logger import opendevin_logger as logger
|
|
|
|
from .files import FileStore
|
|
|
|
|
|
class InMemoryFileStore(FileStore):
|
|
files: dict[str, str]
|
|
|
|
def __init__(self):
|
|
self.files = {}
|
|
|
|
def write(self, path: str, contents: str) -> None:
|
|
self.files[path] = contents
|
|
|
|
def read(self, path: str) -> str:
|
|
if path not in self.files:
|
|
raise FileNotFoundError(path)
|
|
return self.files[path]
|
|
|
|
def list(self, path: str) -> list[str]:
|
|
files = []
|
|
for file in self.files:
|
|
if not file.startswith(path):
|
|
continue
|
|
suffix = file.removeprefix(path)
|
|
parts = suffix.split('/')
|
|
if parts[0] == '':
|
|
parts.pop(0)
|
|
if len(parts) == 1:
|
|
files.append(file)
|
|
else:
|
|
dir_path = os.path.join(path, parts[0])
|
|
if not dir_path.endswith('/'):
|
|
dir_path += '/'
|
|
if dir_path not in files:
|
|
files.append(dir_path)
|
|
return files
|
|
|
|
def delete(self, path: str) -> None:
|
|
try:
|
|
keys_to_delete = [key for key in self.files.keys() if key.startswith(path)]
|
|
for key in keys_to_delete:
|
|
del self.files[key]
|
|
logger.debug(f'Cleared in-memory file store: {path}')
|
|
except Exception as e:
|
|
logger.error(f'Error clearing in-memory file store: {str(e)}')
|