diff --git a/openhands/storage/batched_web_hook.py b/openhands/storage/batched_web_hook.py index 9c6220b65f..6a9495d5e0 100644 --- a/openhands/storage/batched_web_hook.py +++ b/openhands/storage/batched_web_hook.py @@ -1,9 +1,10 @@ import threading -from typing import Optional, Union +from typing import Optional import httpx import tenacity +from openhands.core.logger import openhands_logger as logger from openhands.storage.files import FileStore from openhands.utils.async_utils import EXECUTOR @@ -17,8 +18,8 @@ class BatchedWebHookFileStore(FileStore): This class wraps another FileStore implementation and sends HTTP requests to a specified URL when files are written or deleted. Updates are batched - and sent together after a certain amount of time passes or if the content - size exceeds a threshold. + and sent together after a certain amount of time or size exceeds a threshold. + Time is counted from the last insert. Attributes: file_store: The underlying FileStore implementation @@ -38,7 +39,7 @@ class BatchedWebHookFileStore(FileStore): batch_timeout_seconds: float batch_size_limit_bytes: int _batch_lock: threading.Lock - _batch: dict[str, tuple[str, Optional[Union[str, bytes]]]] + _batch: dict[str, tuple[str, str | bytes | None]] _batch_timer: Optional[threading.Timer] _batch_size: int @@ -81,7 +82,7 @@ class BatchedWebHookFileStore(FileStore): self._batch_timer = None self._batch_size = 0 - def write(self, path: str, contents: Union[str, bytes]) -> None: + def write(self, path: str, contents: str | bytes) -> None: """Write contents to a file and queue a webhook update. Args: @@ -123,7 +124,7 @@ class BatchedWebHookFileStore(FileStore): self._queue_update(path, 'delete', None) def _queue_update( - self, path: str, operation: str, contents: Optional[Union[str, bytes]] + self, path: str, operation: str, contents: str | bytes | None ) -> None: """Queue an update to be sent to the webhook. @@ -159,8 +160,7 @@ class BatchedWebHookFileStore(FileStore): # Check if we need to send the batch due to size limit if self._batch_size >= self.batch_size_limit_bytes: - # Submit to executor to avoid blocking - EXECUTOR.submit(self._send_batch) + self._enqueue_batch_from_lock() return # Start or reset the timer for sending the batch @@ -169,52 +169,55 @@ class BatchedWebHookFileStore(FileStore): self._batch_timer = None timer = threading.Timer( - self.batch_timeout_seconds, self._send_batch_from_timer + self.batch_timeout_seconds, self._enqueue_batch_from_timer ) timer.daemon = True timer.start() self._batch_timer = timer - def _send_batch_from_timer(self) -> None: + def _enqueue_batch_from_timer(self) -> None: """Send the batch from the timer thread. This method is called by the timer and submits the actual sending to the executor. """ - EXECUTOR.submit(self._send_batch) + with self._batch_lock: + self._enqueue_batch_from_lock() - def _send_batch(self) -> None: + def _enqueue_batch_from_lock(self, background=True) -> None: + """ + Must have lock before calling. Will reset the batch state and send the current one. + Uses executor by default, but can perform synchronously by setting background=False + """ + batch_to_send = self._batch + self._batch = {} + self._batch_size = 0 + if background: + EXECUTOR.submit(self._send_batch, batch_to_send) + else: + self._send_batch(batch_to_send) + # Cancel any pending timer + if self._batch_timer is not None: + self._batch_timer.cancel() + self._batch_timer = None + + def _send_batch( + self, batch_to_send: dict[str, tuple[str, str | bytes | None]] + ) -> None: """Send the current batch of updates to the webhook as a single request. This method acquires the batch lock and processes all pending updates in one batch. """ - batch_to_send: dict[str, tuple[str, Optional[Union[str, bytes]]]] = {} - - with self._batch_lock: - if not self._batch: - return - - # Copy the batch and clear the current one - batch_to_send = self._batch.copy() - self._batch.clear() - self._batch_size = 0 - - # Cancel any pending timer - if self._batch_timer is not None: - self._batch_timer.cancel() - self._batch_timer = None - # Process the entire batch in a single request if batch_to_send: try: self._send_batch_request(batch_to_send) - except Exception as e: - # Log the error - print(f'Error sending webhook batch: {e}') + except Exception: + logger.exception('Error sending webhook batch') @tenacity.retry( wait=tenacity.wait_fixed(1), stop=tenacity.stop_after_attempt(3), ) def _send_batch_request( - self, batch: dict[str, tuple[str, Optional[Union[str, bytes]]]] + self, batch: dict[str, tuple[str, str | bytes | None]] ) -> None: """Send a single batch request to the webhook URL with all updates. @@ -260,4 +263,5 @@ class BatchedWebHookFileStore(FileStore): """Immediately send any pending updates to the webhook. This can be called to ensure all updates are sent before shutting down. """ - self._send_batch() + with self._batch_lock: + self._enqueue_batch_from_lock(background=False)