Fix for race condition in cache (#7812)

This commit is contained in:
tofarr 2025-04-12 07:43:34 -06:00 committed by GitHub
parent 20d3766451
commit fddbfce51a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -160,30 +160,38 @@ class EventStream(EventStore):
raise ValueError(
f'Event already has an ID:{event.id}. It was probably added back to the EventStream from inside a handler, triggering a loop.'
)
event._timestamp = datetime.now().isoformat()
event._source = source # type: ignore [attr-defined]
with self._lock:
event._id = self.cur_id # type: ignore [attr-defined]
self.cur_id += 1
logger.debug(f'Adding {type(event).__name__} id={event.id} from {source.name}')
event._timestamp = datetime.now().isoformat()
event._source = source # type: ignore [attr-defined]
logger.debug(f'Event to add: {event}')
data = event_to_dict(event)
data = self._replace_secrets(data)
event = event_from_dict(data)
# Take a copy of the current write page
current_write_page = self._write_page_cache
data = event_to_dict(event)
data = self._replace_secrets(data)
event = event_from_dict(data)
current_write_page.append(data)
# If the page is full, create a new page for future events / other threads to use
if len(current_write_page) == self.cache_size:
self._write_page_cache = []
if event.id is not None:
# Write the event to the store - this can take some time
self.file_store.write(
self._get_filename_for_id(event.id, self.user_id), json.dumps(data)
)
self._write_page_cache.append(data)
self._store_cache_page()
# Store the cache page last - if it is not present during reads then it will simply be bypassed.
self._store_cache_page(current_write_page)
self._queue.put(event)
def _store_cache_page(self):
def _store_cache_page(self, current_write_page: list[dict]):
"""Store a page in the cache. Reading individual events is slow when there are a lot of them, so we use pages."""
current_write_page = self._write_page_cache
if len(current_write_page) < self.cache_size:
return
self._write_page_cache = []
start = current_write_page[0]['id']
end = start + self.cache_size
contents = json.dumps(current_write_page)