fix - Thread-safety in BatchedWebHookFileStore (#10339)

This commit is contained in:
Ray Myers 2025-08-16 13:06:40 -05:00 committed by GitHub
parent f866da6bf2
commit ab9fb50c4f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,9 +1,10 @@
import threading
from typing import Optional, Union
from typing import Optional
import httpx
import tenacity
from openhands.core.logger import openhands_logger as logger
from openhands.storage.files import FileStore
from openhands.utils.async_utils import EXECUTOR
@ -17,8 +18,8 @@ class BatchedWebHookFileStore(FileStore):
This class wraps another FileStore implementation and sends HTTP requests
to a specified URL when files are written or deleted. Updates are batched
and sent together after a certain amount of time passes or if the content
size exceeds a threshold.
and sent together after a certain amount of time or size exceeds a threshold.
Time is counted from the last insert.
Attributes:
file_store: The underlying FileStore implementation
@ -38,7 +39,7 @@ class BatchedWebHookFileStore(FileStore):
batch_timeout_seconds: float
batch_size_limit_bytes: int
_batch_lock: threading.Lock
_batch: dict[str, tuple[str, Optional[Union[str, bytes]]]]
_batch: dict[str, tuple[str, str | bytes | None]]
_batch_timer: Optional[threading.Timer]
_batch_size: int
@ -81,7 +82,7 @@ class BatchedWebHookFileStore(FileStore):
self._batch_timer = None
self._batch_size = 0
def write(self, path: str, contents: Union[str, bytes]) -> None:
def write(self, path: str, contents: str | bytes) -> None:
"""Write contents to a file and queue a webhook update.
Args:
@ -123,7 +124,7 @@ class BatchedWebHookFileStore(FileStore):
self._queue_update(path, 'delete', None)
def _queue_update(
self, path: str, operation: str, contents: Optional[Union[str, bytes]]
self, path: str, operation: str, contents: str | bytes | None
) -> None:
"""Queue an update to be sent to the webhook.
@ -159,8 +160,7 @@ class BatchedWebHookFileStore(FileStore):
# Check if we need to send the batch due to size limit
if self._batch_size >= self.batch_size_limit_bytes:
# Submit to executor to avoid blocking
EXECUTOR.submit(self._send_batch)
self._enqueue_batch_from_lock()
return
# Start or reset the timer for sending the batch
@ -169,52 +169,55 @@ class BatchedWebHookFileStore(FileStore):
self._batch_timer = None
timer = threading.Timer(
self.batch_timeout_seconds, self._send_batch_from_timer
self.batch_timeout_seconds, self._enqueue_batch_from_timer
)
timer.daemon = True
timer.start()
self._batch_timer = timer
def _send_batch_from_timer(self) -> None:
def _enqueue_batch_from_timer(self) -> None:
"""Send the batch from the timer thread.
This method is called by the timer and submits the actual sending to the executor.
"""
EXECUTOR.submit(self._send_batch)
with self._batch_lock:
self._enqueue_batch_from_lock()
def _send_batch(self) -> None:
def _enqueue_batch_from_lock(self, background=True) -> None:
"""
Must have lock before calling. Will reset the batch state and send the current one.
Uses executor by default, but can perform synchronously by setting background=False
"""
batch_to_send = self._batch
self._batch = {}
self._batch_size = 0
if background:
EXECUTOR.submit(self._send_batch, batch_to_send)
else:
self._send_batch(batch_to_send)
# Cancel any pending timer
if self._batch_timer is not None:
self._batch_timer.cancel()
self._batch_timer = None
def _send_batch(
self, batch_to_send: dict[str, tuple[str, str | bytes | None]]
) -> None:
"""Send the current batch of updates to the webhook as a single request.
This method acquires the batch lock and processes all pending updates in one batch.
"""
batch_to_send: dict[str, tuple[str, Optional[Union[str, bytes]]]] = {}
with self._batch_lock:
if not self._batch:
return
# Copy the batch and clear the current one
batch_to_send = self._batch.copy()
self._batch.clear()
self._batch_size = 0
# Cancel any pending timer
if self._batch_timer is not None:
self._batch_timer.cancel()
self._batch_timer = None
# Process the entire batch in a single request
if batch_to_send:
try:
self._send_batch_request(batch_to_send)
except Exception as e:
# Log the error
print(f'Error sending webhook batch: {e}')
except Exception:
logger.exception('Error sending webhook batch')
@tenacity.retry(
wait=tenacity.wait_fixed(1),
stop=tenacity.stop_after_attempt(3),
)
def _send_batch_request(
self, batch: dict[str, tuple[str, Optional[Union[str, bytes]]]]
self, batch: dict[str, tuple[str, str | bytes | None]]
) -> None:
"""Send a single batch request to the webhook URL with all updates.
@ -260,4 +263,5 @@ class BatchedWebHookFileStore(FileStore):
"""Immediately send any pending updates to the webhook.
This can be called to ensure all updates are sent before shutting down.
"""
self._send_batch()
with self._batch_lock:
self._enqueue_batch_from_lock(background=False)