mirror of
https://github.com/OpenHands/OpenHands.git
synced 2025-12-26 05:48:36 +08:00
feat(enterprise): Implement two-phase scheduling for telemetry service
Add adaptive scheduling to minimize time-to-visibility for new installations while maintaining low overhead for established deployments. Two-Phase Scheduling Strategy: ------------------------------- Phase 1 (Bootstrap - No Identity): - Triggered when no user has authenticated yet (no admin email available) - Checks every 3 minutes for first user authentication - Immediately collects and uploads metrics once first user authenticates - Creates Replicated customer/instance identity on first successful upload - Goal: Minimize time between installation and vendor visibility Phase 2 (Normal Operations - Identity Established): - Triggered after identity (customer_id + instance_id) exists in database - Checks every 1 hour (reduced from 3-minute bootstrap interval) - Collects metrics every 7 days - Uploads metrics every 24 hours - Goal: Maintain visibility with minimal resource overhead Implementation Details: ----------------------- 1. Added _is_identity_established() helper method - Checks if both customer_id and instance_id exist in TelemetryIdentity table - Returns True only when identity is fully established 2. Updated _collection_loop() with adaptive intervals - Uses 3-minute interval in bootstrap phase - Switches to 1-hour interval in normal phase - Logs debug messages during bootstrap phase 3. Updated _upload_loop() with adaptive intervals and immediate upload - Uses 3-minute interval in bootstrap phase - Switches to 1-hour interval in normal phase - Detects identity creation and logs first successful upload - Continues with short interval for one cycle after identity creation 4. Added configuration constants - bootstrap_check_interval_seconds = 180 (3 minutes) - normal_check_interval_seconds = 3600 (1 hour) 5. Enhanced error handling - Falls back to bootstrap interval on errors for faster retry 6. Updated class docstring with comprehensive two-phase explanation Benefits: --------- - New installations visible within 3 minutes of first user login (vs up to 1 hour) - Established installations maintain low overhead (hourly checks vs constant polling) - Graceful handling of installations that never get users - Automatic phase transition without manual intervention - Self-optimizing based on deployment state Documentation Updates: --------------------- - Section 4.3: Added two-phase strategy overview - Class docstring: Added detailed phase explanation - Method docstrings: Clarified phase-specific behavior - Inline comments: Explained phase detection and interval selection Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
parent
e63b233b02
commit
d629699326
@ -318,6 +318,14 @@ class SystemMetricsCollector(MetricsCollector):
|
||||
|
||||
The telemetry system runs as an embedded service within the main enterprise server process, using AsyncIO background tasks managed by FastAPI's lifespan events. This approach makes it significantly harder to detect and disable compared to external Kubernetes CronJobs.
|
||||
|
||||
**Two-Phase Scheduling Strategy**: The service uses adaptive scheduling to balance rapid initial visibility with low ongoing overhead:
|
||||
|
||||
- **Phase 1 (Bootstrap)**: When the server first starts, no user has authenticated yet. The service checks every **3 minutes** for the first user authentication. Once a user authenticates and accepts Terms of Service, their email becomes available, enabling immediate identity creation and first metrics upload.
|
||||
|
||||
- **Phase 2 (Normal Operations)**: After the identity is established (customer_id and instance_id exist in database), the service switches to checking every **1 hour**, collecting metrics every **7 days**, and uploading every **24 hours**.
|
||||
|
||||
This ensures new installations become visible to the vendor within minutes of first use (worst case: 3 minutes after first login), while established installations maintain minimal resource overhead.
|
||||
|
||||
#### 4.3.1 TelemetryService - Core Service Class
|
||||
|
||||
**File**: `enterprise/server/telemetry/service.py`
|
||||
@ -344,6 +352,25 @@ class TelemetryService:
|
||||
This service runs as part of the main enterprise server process using AsyncIO
|
||||
background tasks. It starts automatically during FastAPI application startup
|
||||
and runs independently without requiring external CronJobs or maintenance workers.
|
||||
|
||||
Two-Phase Scheduling:
|
||||
---------------------
|
||||
The service uses adaptive scheduling to minimize time-to-visibility for new installations:
|
||||
|
||||
Phase 1 (Bootstrap - No Identity Established):
|
||||
- Runs when no user has authenticated yet (no admin email available)
|
||||
- Checks every 3 minutes for first user authentication
|
||||
- Immediately collects and uploads metrics once first user authenticates
|
||||
- Creates Replicated customer and instance identity on first upload
|
||||
|
||||
Phase 2 (Normal Operations - Identity Established):
|
||||
- Runs after identity (customer_id + instance_id) is created
|
||||
- Checks every hour (reduced overhead)
|
||||
- Collects metrics every 7 days
|
||||
- Uploads metrics every 24 hours
|
||||
|
||||
This ensures new installations become visible to the vendor within minutes of first use,
|
||||
while established installations maintain low resource overhead.
|
||||
"""
|
||||
|
||||
_instance: Optional['TelemetryService'] = None
|
||||
@ -368,6 +395,12 @@ class TelemetryService:
|
||||
self.upload_interval_hours = int(os.getenv('TELEMETRY_UPLOAD_INTERVAL_HOURS', '24'))
|
||||
self.license_warning_threshold_days = int(os.getenv('TELEMETRY_WARNING_THRESHOLD_DAYS', '4'))
|
||||
|
||||
# Two-phase scheduling: Before identity is established, check more frequently
|
||||
# Phase 1 (no identity): Check every 3 minutes for first user authentication
|
||||
# Phase 2 (identity exists): Check every hour for normal operations
|
||||
self.bootstrap_check_interval_seconds = 180 # 3 minutes
|
||||
self.normal_check_interval_seconds = 3600 # 1 hour
|
||||
|
||||
# Replicated API configuration - HARDCODED for security through obscurity
|
||||
# This publishable key (replicated_pk_...) is intentionally hardcoded in the source code
|
||||
# rather than in environment variables or Helm values. This makes the telemetry system:
|
||||
@ -430,23 +463,35 @@ class TelemetryService:
|
||||
logger.info("TelemetryService stopped")
|
||||
|
||||
async def _collection_loop(self):
|
||||
"""Background task that checks hourly if metrics collection is needed.
|
||||
"""Background task that checks if metrics collection is needed.
|
||||
|
||||
Sleeps for 1 hour between checks. When 7 days have passed since the last
|
||||
collection, it triggers metrics collection from all registered collectors.
|
||||
Uses two-phase scheduling:
|
||||
- Phase 1 (bootstrap): Checks every 3 minutes until identity is established
|
||||
- Phase 2 (normal): Checks every hour, collects every 7 days
|
||||
|
||||
This ensures rapid first collection after user authentication while maintaining
|
||||
low overhead for ongoing operations.
|
||||
"""
|
||||
logger.info(f"Collection loop started (interval: {self.collection_interval_days} days)")
|
||||
|
||||
while not self._shutdown_event.is_set():
|
||||
try:
|
||||
# Determine check interval based on whether identity is established
|
||||
identity_established = self._is_identity_established()
|
||||
check_interval = (self.normal_check_interval_seconds if identity_established
|
||||
else self.bootstrap_check_interval_seconds)
|
||||
|
||||
if not identity_established:
|
||||
logger.debug("Identity not yet established, using bootstrap interval (3 minutes)")
|
||||
|
||||
# Check if collection is needed
|
||||
if self._should_collect():
|
||||
logger.info("Starting metrics collection")
|
||||
await self._collect_metrics()
|
||||
logger.info("Metrics collection completed")
|
||||
|
||||
# Sleep for 1 hour before checking again
|
||||
await asyncio.sleep(3600)
|
||||
# Sleep until next check (interval depends on phase)
|
||||
await asyncio.sleep(check_interval)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Collection loop cancelled")
|
||||
@ -454,26 +499,48 @@ class TelemetryService:
|
||||
except Exception as e:
|
||||
logger.error(f"Error in collection loop: {e}", exc_info=True)
|
||||
# Continue running even if collection fails
|
||||
await asyncio.sleep(3600)
|
||||
# Use shorter interval on error to retry sooner
|
||||
await asyncio.sleep(self.bootstrap_check_interval_seconds)
|
||||
|
||||
async def _upload_loop(self):
|
||||
"""Background task that checks hourly if metrics upload is needed.
|
||||
"""Background task that checks if metrics upload is needed.
|
||||
|
||||
Sleeps for 1 hour between checks. When 24 hours have passed since the last
|
||||
upload, it triggers upload of pending metrics to Replicated.
|
||||
Uses two-phase scheduling:
|
||||
- Phase 1 (bootstrap): Checks every 3 minutes for first user, uploads immediately
|
||||
- Phase 2 (normal): Checks every hour, uploads every 24 hours
|
||||
|
||||
When identity is first established, triggers immediate upload to minimize time
|
||||
until vendor visibility. After that, follows normal 24-hour upload schedule.
|
||||
"""
|
||||
logger.info(f"Upload loop started (interval: {self.upload_interval_hours} hours)")
|
||||
|
||||
while not self._shutdown_event.is_set():
|
||||
try:
|
||||
# Determine check interval based on whether identity is established
|
||||
identity_established = self._is_identity_established()
|
||||
check_interval = (self.normal_check_interval_seconds if identity_established
|
||||
else self.bootstrap_check_interval_seconds)
|
||||
|
||||
if not identity_established:
|
||||
logger.debug("Identity not yet established, using bootstrap interval (3 minutes)")
|
||||
|
||||
# Check if upload is needed
|
||||
# In bootstrap phase, attempt upload whenever there are pending metrics
|
||||
# (upload will be skipped internally if no admin email available)
|
||||
if self._should_upload():
|
||||
logger.info("Starting metrics upload")
|
||||
was_established_before = identity_established
|
||||
await self._upload_pending_metrics()
|
||||
|
||||
# If identity was just established, it will be created during upload
|
||||
# Continue with short interval for one more cycle to ensure upload succeeds
|
||||
if not was_established_before and self._is_identity_established():
|
||||
logger.info("Identity just established - first upload completed")
|
||||
|
||||
logger.info("Metrics upload completed")
|
||||
|
||||
# Sleep for 1 hour before checking again
|
||||
await asyncio.sleep(3600)
|
||||
# Sleep until next check (interval depends on phase)
|
||||
await asyncio.sleep(check_interval)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Upload loop cancelled")
|
||||
@ -481,7 +548,8 @@ class TelemetryService:
|
||||
except Exception as e:
|
||||
logger.error(f"Error in upload loop: {e}", exc_info=True)
|
||||
# Continue running even if upload fails
|
||||
await asyncio.sleep(3600)
|
||||
# Use shorter interval on error to retry sooner
|
||||
await asyncio.sleep(self.bootstrap_check_interval_seconds)
|
||||
|
||||
async def _initial_collection_check(self):
|
||||
"""Check on startup if initial collection is needed."""
|
||||
@ -494,6 +562,26 @@ class TelemetryService:
|
||||
except Exception as e:
|
||||
logger.error(f"Error during initial collection check: {e}")
|
||||
|
||||
def _is_identity_established(self) -> bool:
|
||||
"""Check if telemetry identity has been established.
|
||||
|
||||
Returns True if we have both customer_id and instance_id in the database,
|
||||
indicating that at least one user has authenticated and we can send telemetry.
|
||||
"""
|
||||
try:
|
||||
with session_maker() as session:
|
||||
identity = session.query(TelemetryIdentity).filter(
|
||||
TelemetryIdentity.id == 1
|
||||
).first()
|
||||
|
||||
# Identity is established if we have both customer_id and instance_id
|
||||
return (identity is not None and
|
||||
identity.customer_id is not None and
|
||||
identity.instance_id is not None)
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking identity status: {e}")
|
||||
return False
|
||||
|
||||
def _should_collect(self) -> bool:
|
||||
"""Check if 7 days have passed since last collection."""
|
||||
try:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user