Files
OpenHands/opendevin/storage/memory.py
Robert Brennan dcb5d1ce0a Add permanent storage option for EventStream (#1697)
* add storage classes

* add minio

* add event stream storage

* storage test working

* use fixture

* event stream test passing

* better serialization

* factor out serialization pkg

* move more serialization

* fix tests

* fix test

* remove __all__

* add rehydration test

* add more rehydration test

* fix fixture

* fix dict init

* update tests

* lock

* regenerate tests

* Update opendevin/events/stream.py

* revert tests

* revert old integration tests

* only add fields if present

* regen tests

* pin pyarrow

* fix unit tests

* remove cause from memories

* revert tests

* regen tests
2024-05-14 11:09:45 -04:00

39 lines
1017 B
Python

import os
from .files import FileStore
class InMemoryFileStore(FileStore):
files: dict[str, str]
def __init__(self):
self.files = {}
def write(self, path: str, contents: str) -> None:
self.files[path] = contents
def read(self, path: str) -> str:
if path not in self.files:
raise FileNotFoundError(path)
return self.files[path]
def list(self, path: str) -> list[str]:
files = []
for file in self.files:
if not file.startswith(path):
continue
suffix = file.removeprefix(path)
parts = suffix.split('/')
if parts[0] == '':
parts.pop(0)
if len(parts) == 1:
files.append(file)
else:
dir_path = os.path.join(path, parts[0])
if dir_path not in files:
files.append(dir_path)
return files
def delete(self, path: str) -> None:
del self.files[path]