Small refactor : EventStream as a dataclass (#4557)

This commit is contained in:
tofarr
2024-10-25 11:31:20 -06:00
committed by GitHub
parent 1f23dc89b6
commit d4e3982a6b

View File

@@ -1,5 +1,6 @@
import asyncio
import threading
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Callable, Iterable
@@ -29,24 +30,17 @@ def session_exists(sid: str, file_store: FileStore) -> bool:
return False
@dataclass
class EventStream:
sid: str
file_store: FileStore
# For each subscriber ID, there is a stack of callback functions - useful
# when there are agent delegates
_subscribers: dict[str, list[Callable]]
_cur_id: int
_lock: threading.Lock
_subscribers: dict[str, list[Callable]] = field(default_factory=dict)
_cur_id: int = 0
_lock: threading.Lock = field(default_factory=threading.Lock)
def __init__(self, sid: str, file_store: FileStore):
self.sid = sid
self.file_store = file_store
self._subscribers = {}
self._cur_id = 0
self._lock = threading.Lock()
self._reinitialize_from_file_store()
def _reinitialize_from_file_store(self) -> None:
def __post_init__(self) -> None:
try:
events = self.file_store.list(f'sessions/{self.sid}/events')
except FileNotFoundError:
@@ -173,4 +167,4 @@ class EventStream:
self.file_store.delete(f'sessions/{self.sid}')
self._cur_id = 0
# self._subscribers = {}
self._reinitialize_from_file_store()
self.__post_init__()