diff --git a/enterprise/doc/design-doc/openhands-enterprise-telemetry-design.md b/enterprise/doc/design-doc/openhands-enterprise-telemetry-design.md index 4fc9f72c00..52a7bc854e 100644 --- a/enterprise/doc/design-doc/openhands-enterprise-telemetry-design.md +++ b/enterprise/doc/design-doc/openhands-enterprise-telemetry-design.md @@ -20,15 +20,18 @@ - 4.2.1 [Base Collector Interface](#421-base-collector-interface) - 4.2.2 [Collector Registry](#422-collector-registry) - 4.2.3 [Example Collector Implementation](#423-example-collector-implementation) - - 4.3 [Collection and Upload System](#43-collection-and-upload-system) - - 4.3.1 [Metrics Collection Processor](#431-metrics-collection-processor) - - 4.3.2 [Replicated Upload Processor](#432-replicated-upload-processor) + - 4.3 [Embedded Telemetry Service](#43-embedded-telemetry-service) + - 4.3.1 [TelemetryService - Core Service Class](#431-telemetryservice---core-service-class) + - 4.3.2 [FastAPI Lifespan Integration](#432-fastapi-lifespan-integration) + - 4.3.3 [Enterprise Server Integration](#433-enterprise-server-integration) - 4.4 [License Warning System](#44-license-warning-system) - 4.4.1 [License Status Endpoint](#441-license-status-endpoint) - 4.4.2 [UI Integration](#442-ui-integration) - - 4.5 [Cronjob Configuration](#45-cronjob-configuration) - - 4.5.1 [Collection Cronjob](#451-collection-cronjob) - - 4.5.2 [Upload Cronjob](#452-upload-cronjob) + - 4.5 [Environment Configuration](#45-environment-configuration) + - 4.5.1 [Required Environment Variables](#451-required-environment-variables) + - 4.5.2 [Helm Chart Values](#452-helm-chart-values) + - 4.5.3 [Helm Secret Configuration](#453-helm-secret-configuration) + - 4.5.4 [Deployment Environment Variables](#454-deployment-environment-variables) 5. [Implementation Plan](#5-implementation-plan) - 5.1 [Database Schema and Models (M1)](#51-database-schema-and-models-m1) - 5.1.1 [OpenHands - Database Migration](#511-openhands---database-migration) @@ -37,9 +40,9 @@ - 5.2.1 [OpenHands - Core Collection Framework](#521-openhands---core-collection-framework) - 5.2.2 [OpenHands - Example Collectors](#522-openhands---example-collectors) - 5.2.3 [OpenHands - Framework Tests](#523-openhands---framework-tests) - - 5.3 [Collection and Upload Processors (M3)](#53-collection-and-upload-processors-m3) - - 5.3.1 [OpenHands - Collection Processor](#531-openhands---collection-processor) - - 5.3.2 [OpenHands - Upload Processor](#532-openhands---upload-processor) + - 5.3 [Embedded Telemetry Service (M3)](#53-embedded-telemetry-service-m3) + - 5.3.1 [OpenHands - Telemetry Service](#531-openhands---telemetry-service) + - 5.3.2 [OpenHands - Server Integration](#532-openhands---server-integration) - 5.3.3 [OpenHands - Integration Tests](#533-openhands---integration-tests) - 5.4 [License Warning API (M4)](#54-license-warning-api-m4) - 5.4.1 [OpenHands - License Status API](#541-openhands---license-status-api) @@ -47,9 +50,10 @@ - 5.5 [UI Warning Banner (M5)](#55-ui-warning-banner-m5) - 5.5.1 [OpenHands - UI Warning Banner](#551-openhands---ui-warning-banner) - 5.5.2 [OpenHands - UI Integration](#552-openhands---ui-integration) - - 5.6 [Helm Chart Deployment Configuration (M6)](#56-helm-chart-deployment-configuration-m6) - - 5.6.1 [OpenHands-Cloud - Cronjob Manifests](#561-openhands-cloud---cronjob-manifests) - - 5.6.2 [OpenHands-Cloud - Configuration Management](#562-openhands-cloud---configuration-management) + - 5.6 [Helm Chart Environment Configuration (M6)](#56-helm-chart-environment-configuration-m6) + - 5.6.1 [OpenHands-Cloud - Secret Management](#561-openhands-cloud---secret-management) + - 5.6.2 [OpenHands-Cloud - Values Configuration](#562-openhands-cloud---values-configuration) + - 5.6.3 [OpenHands-Cloud - Deployment Environment Variables](#563-openhands-cloud---deployment-environment-variables) - 5.7 [Documentation and Enhanced Collectors (M7)](#57-documentation-and-enhanced-collectors-m7) - 5.7.1 [OpenHands - Advanced Collectors](#571-openhands---advanced-collectors) - 5.7.2 [OpenHands - Monitoring and Testing](#572-openhands---monitoring-and-testing) @@ -65,9 +69,9 @@ OpenHands Enterprise (OHE) helm charts are publicly available but not open sourc We propose implementing a comprehensive telemetry service that leverages the Replicated metrics platform and Python SDK to track OHE installations and usage. The solution provides automatic customer discovery, instance monitoring, and usage metrics collection while maintaining a clear license compliance pathway. -The system consists of three main components: (1) a pluggable metrics collection framework that allows developers to easily define and register custom metrics collectors, (2) automated cronjobs that periodically collect metrics and upload them to Replicated's vendor portal, and (3) a license compliance warning system that displays UI notifications when telemetry uploads fail, indicating potential license expiration. +The system consists of three main components: (1) a pluggable metrics collection framework that allows developers to easily define and register custom metrics collectors, (2) an embedded background service that runs within the main enterprise server process using AsyncIO to periodically collect metrics and upload them to Replicated's vendor portal, and (3) a license compliance warning system that displays UI notifications when telemetry uploads fail, indicating potential license expiration. -The design ensures that telemetry cannot be easily disabled without breaking core OHE functionality by tying the warning system to environment variables that are essential for OHE operation. This approach balances user transparency with business requirements for customer visibility. +The design ensures that telemetry cannot be easily disabled by embedding it deeply into the enterprise server code. The telemetry service starts automatically with the FastAPI application using lifespan events and runs as independent AsyncIO background tasks. This approach makes the telemetry system significantly harder to detect and disable compared to external Kubernetes CronJobs, while balancing user transparency with business requirements for customer visibility. ## 2. User Interface @@ -94,7 +98,7 @@ System administrators will not need to configure the telemetry system manually. - Instance ID: Automatically generated by Replicated SDK using machine fingerprinting (IOPlatformUUID on macOS, D-Bus machine ID on Linux, Machine GUID on Windows) - **No Fallback**: If neither email source is available, telemetry collection is skipped until at least one user exists -3. **Collects and uploads metrics transparently** in the background via weekly collection and daily upload cronjobs +3. **Collects and uploads metrics transparently** in the background via AsyncIO tasks that run within the main server process (weekly collection, daily upload) 4. **Displays warnings only when necessary** for license compliance - no notifications appear during normal operation @@ -135,7 +139,7 @@ class UserActivityCollector(MetricsCollector): return {"active_users_7d": count, "conversations_created": total} ``` -Collectors are automatically discovered and executed by the collection cronjob, making the system extensible without modifying core collection logic. +Collectors are automatically discovered and executed by the background collection task, making the system extensible without modifying core collection logic. ## 4. Technical Design @@ -289,200 +293,489 @@ class SystemMetricsCollector(MetricsCollector): return results ``` -### 4.3 Collection and Upload System +### 4.3 Embedded Telemetry Service -#### 4.3.1 Metrics Collection Processor +The telemetry system runs as an embedded service within the main enterprise server process, using AsyncIO background tasks managed by FastAPI's lifespan events. This approach makes it significantly harder to detect and disable compared to external Kubernetes CronJobs. + +#### 4.3.1 TelemetryService - Core Service Class + +**File**: `enterprise/server/telemetry/service.py` ```python -class TelemetryCollectionProcessor(MaintenanceTaskProcessor): - """Maintenance task processor for collecting metrics.""" +"""Embedded telemetry service that runs as part of the enterprise server process.""" - collection_interval_days: int = 7 +import asyncio +import os +from datetime import datetime, timedelta, timezone +from typing import Optional - async def __call__(self, task: MaintenanceTask) -> dict: - """Collect metrics from all registered collectors.""" +from server.logger import logger +from storage.database import session_maker +from storage.telemetry_identity import TelemetryIdentity +from storage.telemetry_metrics import TelemetryMetrics +from storage.user_settings import UserSettings +from telemetry.registry import CollectorRegistry - # Check if collection is needed - if not self._should_collect(): - return {"status": "skipped", "reason": "too_recent"} - # Collect metrics from all registered collectors - all_metrics = {} - collector_results = {} - - for collector in collector_registry.get_all_collectors(): +class TelemetryService: + """Singleton service for managing embedded telemetry collection and upload. + + This service runs as part of the main enterprise server process using AsyncIO + background tasks. It starts automatically during FastAPI application startup + and runs independently without requiring external CronJobs or maintenance workers. + """ + + _instance: Optional['TelemetryService'] = None + _initialized: bool = False + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + if self._initialized: + return + + self._initialized = True + self._collection_task: Optional[asyncio.Task] = None + self._upload_task: Optional[asyncio.Task] = None + self._shutdown_event = asyncio.Event() + + # Configuration (from environment or defaults) + self.collection_interval_days = int(os.getenv('TELEMETRY_COLLECTION_INTERVAL_DAYS', '7')) + self.upload_interval_hours = int(os.getenv('TELEMETRY_UPLOAD_INTERVAL_HOURS', '24')) + self.license_warning_threshold_days = int(os.getenv('TELEMETRY_WARNING_THRESHOLD_DAYS', '4')) + + # Replicated configuration + self.replicated_publishable_key = os.getenv('REPLICATED_PUBLISHABLE_KEY') + self.replicated_app_slug = os.getenv('REPLICATED_APP_SLUG', 'openhands-enterprise') + + logger.info("TelemetryService initialized") + + async def start(self): + """Start the telemetry service background tasks. + + Called automatically during FastAPI application startup via lifespan events. + """ + if self._collection_task is not None or self._upload_task is not None: + logger.warning("TelemetryService already started") + return + + logger.info("Starting TelemetryService background tasks") + + # Start independent background loops + self._collection_task = asyncio.create_task(self._collection_loop()) + self._upload_task = asyncio.create_task(self._upload_loop()) + + # Run initial collection if needed (don't wait for 7-day interval) + asyncio.create_task(self._initial_collection_check()) + + async def stop(self): + """Stop the telemetry service and perform cleanup. + + Called automatically during FastAPI application shutdown via lifespan events. + """ + logger.info("Stopping TelemetryService") + + self._shutdown_event.set() + + # Cancel background tasks + if self._collection_task: + self._collection_task.cancel() try: - if collector.should_collect(): - results = collector.collect() - for result in results: - all_metrics[result.key] = result.value - collector_results[collector.collector_name] = len(results) + await self._collection_task + except asyncio.CancelledError: + pass + + if self._upload_task: + self._upload_task.cancel() + try: + await self._upload_task + except asyncio.CancelledError: + pass + + logger.info("TelemetryService stopped") + + async def _collection_loop(self): + """Background task that checks hourly if metrics collection is needed. + + Sleeps for 1 hour between checks. When 7 days have passed since the last + collection, it triggers metrics collection from all registered collectors. + """ + logger.info(f"Collection loop started (interval: {self.collection_interval_days} days)") + + while not self._shutdown_event.is_set(): + try: + # Check if collection is needed + if self._should_collect(): + logger.info("Starting metrics collection") + await self._collect_metrics() + logger.info("Metrics collection completed") + + # Sleep for 1 hour before checking again + await asyncio.sleep(3600) + + except asyncio.CancelledError: + logger.info("Collection loop cancelled") + break except Exception as e: - logger.error(f"Collector {collector.collector_name} failed: {e}") - collector_results[collector.collector_name] = f"error: {e}" - - # Store metrics in database - with session_maker() as session: - telemetry_record = TelemetryMetrics( - metrics_data=all_metrics, - collected_at=datetime.now(timezone.utc) - ) - session.add(telemetry_record) - session.commit() - - # Note: No need to track last_collection_at separately - # Can be derived from MAX(collected_at) in telemetry_metrics - - return { - "status": "completed", - "metrics_collected": len(all_metrics), - "collectors_run": collector_results - } - + logger.error(f"Error in collection loop: {e}", exc_info=True) + # Continue running even if collection fails + await asyncio.sleep(3600) + + async def _upload_loop(self): + """Background task that checks hourly if metrics upload is needed. + + Sleeps for 1 hour between checks. When 24 hours have passed since the last + upload, it triggers upload of pending metrics to Replicated. + """ + logger.info(f"Upload loop started (interval: {self.upload_interval_hours} hours)") + + while not self._shutdown_event.is_set(): + try: + # Check if upload is needed + if self._should_upload(): + logger.info("Starting metrics upload") + await self._upload_pending_metrics() + logger.info("Metrics upload completed") + + # Sleep for 1 hour before checking again + await asyncio.sleep(3600) + + except asyncio.CancelledError: + logger.info("Upload loop cancelled") + break + except Exception as e: + logger.error(f"Error in upload loop: {e}", exc_info=True) + # Continue running even if upload fails + await asyncio.sleep(3600) + + async def _initial_collection_check(self): + """Check on startup if initial collection is needed.""" + try: + with session_maker() as session: + count = session.query(TelemetryMetrics).count() + if count == 0: + logger.info("No existing metrics found, running initial collection") + await self._collect_metrics() + except Exception as e: + logger.error(f"Error during initial collection check: {e}") + def _should_collect(self) -> bool: - """Check if collection is needed based on interval.""" - with session_maker() as session: - # Get last collection time from metrics table - last_collected = session.query(func.max(TelemetryMetrics.collected_at)).scalar() - if not last_collected: - return True - - time_since_last = datetime.now(timezone.utc) - last_collected - return time_since_last.days >= self.collection_interval_days -``` - -#### 4.3.2 Replicated Upload Processor - -```python -from replicated import AsyncReplicatedClient, InstanceStatus - -class TelemetryUploadProcessor(MaintenanceTaskProcessor): - """Maintenance task processor for uploading metrics to Replicated.""" - - replicated_publishable_key: str - replicated_app_slug: str - - async def __call__(self, task: MaintenanceTask) -> dict: - """Upload pending metrics to Replicated.""" - - # Get pending metrics - with session_maker() as session: - pending_metrics = session.query(TelemetryMetrics)\ - .filter(TelemetryMetrics.uploaded_at.is_(None))\ - .order_by(TelemetryMetrics.collected_at)\ - .all() - - if not pending_metrics: - return {"status": "no_pending_metrics"} - - # Get admin email - skip if not available - admin_email = self._get_admin_email() - if not admin_email: - logger.info("Skipping telemetry upload - no admin email available") - return { - "status": "skipped", - "reason": "no_admin_email", - "total_processed": 0 - } - - uploaded_count = 0 - failed_count = 0 - - async with AsyncReplicatedClient( - publishable_key=self.replicated_publishable_key, - app_slug=self.replicated_app_slug - ) as client: - - # Get or create customer and instance - customer = await client.customer.get_or_create( - email_address=admin_email - ) - instance = await customer.get_or_create_instance() - - # Store customer/instance IDs for future use - await self._update_telemetry_identity(customer.customer_id, instance.instance_id) - - # Upload each metric batch - for metric_record in pending_metrics: + """Check if 7 days have passed since last collection.""" + try: + with session_maker() as session: + last_metric = ( + session.query(TelemetryMetrics) + .order_by(TelemetryMetrics.collected_at.desc()) + .first() + ) + + if not last_metric: + return True # First collection + + days_since = (datetime.now(timezone.utc) - last_metric.collected_at).days + return days_since >= self.collection_interval_days + except Exception as e: + logger.error(f"Error checking collection status: {e}") + return False + + def _should_upload(self) -> bool: + """Check if 24 hours have passed since last upload.""" + try: + with session_maker() as session: + last_uploaded = ( + session.query(TelemetryMetrics) + .filter(TelemetryMetrics.uploaded_at.isnot(None)) + .order_by(TelemetryMetrics.uploaded_at.desc()) + .first() + ) + + if not last_uploaded: + # Check if we have any pending metrics to upload + pending_count = session.query(TelemetryMetrics).filter( + TelemetryMetrics.uploaded_at.is_(None) + ).count() + return pending_count > 0 + + hours_since = (datetime.now(timezone.utc) - last_uploaded.uploaded_at).total_seconds() / 3600 + return hours_since >= self.upload_interval_hours + except Exception as e: + logger.error(f"Error checking upload status: {e}") + return False + + async def _collect_metrics(self): + """Collect metrics from all registered collectors and store in database.""" + try: + # Get all registered collectors + registry = CollectorRegistry() + collectors = registry.get_all_collectors() + + # Collect metrics from each collector + all_metrics = {} + collector_results = {} + + for collector in collectors: try: - # Send individual metrics - for key, value in metric_record.metrics_data.items(): - await instance.send_metric(key, value) - - # Update instance status - await instance.set_status(InstanceStatus.RUNNING) - - # Mark as uploaded - with session_maker() as session: - record = session.query(TelemetryMetrics)\ - .filter(TelemetryMetrics.id == metric_record.id)\ - .first() - if record: - record.uploaded_at = datetime.now(timezone.utc) - session.commit() - - uploaded_count += 1 - + if collector.should_collect(): + results = collector.collect() + for result in results: + all_metrics[result.key] = result.value + collector_results[collector.collector_name] = len(results) + logger.info(f"Collected {len(results)} metrics from {collector.collector_name}") except Exception as e: - logger.error(f"Failed to upload metrics {metric_record.id}: {e}") - - # Update error info - with session_maker() as session: - record = session.query(TelemetryMetrics)\ - .filter(TelemetryMetrics.id == metric_record.id)\ - .first() - if record: - record.upload_attempts += 1 - record.last_upload_error = str(e) - session.commit() - - failed_count += 1 - - # Note: No need to track last_successful_upload_at separately - # Can be derived from MAX(uploaded_at) in telemetry_metrics - - return { - "status": "completed", - "uploaded": uploaded_count, - "failed": failed_count, - "total_processed": len(pending_metrics) - } - - def _get_admin_email(self) -> str | None: - """Get administrator email for customer identification.""" - # 1. Check environment variable first - env_admin_email = os.getenv('OPENHANDS_ADMIN_EMAIL') - if env_admin_email: - logger.info("Using admin email from environment variable") - return env_admin_email - - # 2. Use first active user's email (earliest accepted_tos) - with session_maker() as session: - first_user = session.query(UserSettings)\ - .filter(UserSettings.email.isnot(None))\ - .filter(UserSettings.accepted_tos.isnot(None))\ - .order_by(UserSettings.accepted_tos.asc())\ + logger.error(f"Collector {collector.collector_name} failed: {e}", exc_info=True) + collector_results[collector.collector_name] = f"error: {str(e)}" + + # Store metrics in database + with session_maker() as session: + telemetry_record = TelemetryMetrics( + metrics_data=all_metrics, + collected_at=datetime.now(timezone.utc) + ) + session.add(telemetry_record) + session.commit() + + logger.info(f"Stored {len(all_metrics)} metrics in database") + + except Exception as e: + logger.error(f"Error during metrics collection: {e}", exc_info=True) + + async def _upload_pending_metrics(self): + """Upload pending metrics to Replicated.""" + if not self.replicated_publishable_key: + logger.warning("REPLICATED_PUBLISHABLE_KEY not set, skipping upload") + return + + try: + from replicated_sdk import Client, Instance + + # Get pending metrics + with session_maker() as session: + pending_metrics = ( + session.query(TelemetryMetrics) + .filter(TelemetryMetrics.uploaded_at.is_(None)) + .order_by(TelemetryMetrics.collected_at) + .all() + ) + + if not pending_metrics: + logger.info("No pending metrics to upload") + return + + # Get admin email - skip if not available + admin_email = self._get_admin_email(session) + if not admin_email: + logger.warning("No admin email available, skipping upload") + return + + # Get or create identity + identity = self._get_or_create_identity(session, admin_email) + + # Initialize Replicated client + client = Client( + api_token=self.replicated_publishable_key, + app_slug=self.replicated_app_slug + ) + + # Upload each pending metric + successful_count = 0 + for metric in pending_metrics: + try: + # Send to Replicated + client.send_instance_data( + customer_id=identity.customer_id, + instance_id=identity.instance_id, + data=metric.metrics_data, + status=Instance.Status.RUNNING + ) + + # Mark as uploaded + metric.uploaded_at = datetime.now(timezone.utc) + metric.upload_attempts += 1 + metric.last_upload_error = None + successful_count += 1 + + logger.info(f"Uploaded metric {metric.id} to Replicated") + + except Exception as e: + metric.upload_attempts += 1 + metric.last_upload_error = str(e) + logger.error(f"Error uploading metric {metric.id}: {e}") + + session.commit() + logger.info(f"Successfully uploaded {successful_count}/{len(pending_metrics)} metrics") + + except Exception as e: + logger.error(f"Error during metrics upload: {e}", exc_info=True) + + def _get_admin_email(self, session) -> Optional[str]: + """Determine admin email from environment or database.""" + # Try environment variable first + admin_email = os.getenv('OPENHANDS_ADMIN_EMAIL') + if admin_email: + logger.info("Using admin email from OPENHANDS_ADMIN_EMAIL environment variable") + return admin_email + + # Try first user who accepted ToS + try: + first_user = ( + session.query(UserSettings) + .filter(UserSettings.accepted_tos == True) # noqa: E712 + .filter(UserSettings.email.isnot(None)) + .order_by(UserSettings.accepted_tos_timestamp) .first() - + ) if first_user and first_user.email: logger.info(f"Using first active user email: {first_user.email}") return first_user.email - - # No admin email available - skip telemetry - logger.info("No admin email available - skipping telemetry collection") + except Exception as e: + logger.error(f"Error determining admin email: {e}") + return None + + def _get_or_create_identity(self, session, admin_email: str) -> TelemetryIdentity: + """Get or create telemetry identity with customer and instance IDs.""" + identity = session.query(TelemetryIdentity).filter( + TelemetryIdentity.id == 1 + ).first() + + if not identity: + identity = TelemetryIdentity(id=1) + session.add(identity) + + # Set customer_id to admin email if not already set + if not identity.customer_id: + identity.customer_id = admin_email + + # Generate instance_id using Replicated SDK if not set + if not identity.instance_id: + try: + from replicated_sdk import Client + client = Client( + api_token=self.replicated_publishable_key, + app_slug=self.replicated_app_slug + ) + identity.instance_id = client.instance_id + except Exception as e: + logger.error(f"Error generating instance_id: {e}") + # Generate a fallback UUID if Replicated SDK fails + import uuid + identity.instance_id = str(uuid.uuid4()) + + session.commit() + return identity + + def get_license_warning_status(self) -> dict: + """Get current license warning status for UI display. + + Returns: + dict with 'should_warn', 'days_since_upload', and 'message' keys + """ + try: + with session_maker() as session: + last_uploaded = ( + session.query(TelemetryMetrics) + .filter(TelemetryMetrics.uploaded_at.isnot(None)) + .order_by(TelemetryMetrics.uploaded_at.desc()) + .first() + ) + + if not last_uploaded: + return { + 'should_warn': False, + 'days_since_upload': None, + 'message': 'No uploads yet' + } + + days_since_upload = ( + datetime.now(timezone.utc) - last_uploaded.uploaded_at + ).days + + should_warn = days_since_upload > self.license_warning_threshold_days + + return { + 'should_warn': should_warn, + 'days_since_upload': days_since_upload, + 'message': f'Last upload: {days_since_upload} days ago' + } + except Exception as e: + logger.error(f"Error getting license warning status: {e}") + return { + 'should_warn': False, + 'days_since_upload': None, + 'message': f'Error: {str(e)}' + } - async def _update_telemetry_identity(self, customer_id: str, instance_id: str) -> None: - """Update or create telemetry identity record.""" - with session_maker() as session: - identity = session.query(TelemetryIdentity).first() - if not identity: - identity = TelemetryIdentity() - session.add(identity) - identity.customer_id = customer_id - identity.instance_id = instance_id - session.commit() +# Global singleton instance +telemetry_service = TelemetryService() +``` + +#### 4.3.2 FastAPI Lifespan Integration + +**File**: `enterprise/server/telemetry/lifecycle.py` + +```python +"""FastAPI lifespan integration for the embedded telemetry service.""" + +from contextlib import asynccontextmanager +from typing import AsyncIterator + +from fastapi import FastAPI + +from server.logger import logger +from server.telemetry.service import telemetry_service + + +@asynccontextmanager +async def telemetry_lifespan(app: FastAPI) -> AsyncIterator[None]: + """FastAPI lifespan context manager for telemetry service. + + This is called automatically during FastAPI application startup and shutdown, + managing the lifecycle of the telemetry background tasks. + + Startup: Initializes and starts background collection and upload tasks + Shutdown: Gracefully stops background tasks + """ + logger.info("Starting telemetry service lifespan") + + # Startup - start background tasks + try: + await telemetry_service.start() + logger.info("Telemetry service started successfully") + except Exception as e: + logger.error(f"Error starting telemetry service: {e}", exc_info=True) + # Don't fail server startup if telemetry fails + + yield # Server runs here + + # Shutdown - stop background tasks + try: + await telemetry_service.stop() + logger.info("Telemetry service stopped successfully") + except Exception as e: + logger.error(f"Error stopping telemetry service: {e}", exc_info=True) +``` + +#### 4.3.3 Enterprise Server Integration + +**File**: `enterprise/saas_server.py` (add to existing file) + +```python +# Add import at top of file with other imports +from server.telemetry.lifecycle import telemetry_lifespan # noqa: E402 + +# Later in the file, after base_app is imported from openhands.server.app +# Add telemetry lifespan to the application +from openhands.server.app import lifespans + +# Append telemetry lifespan to existing lifespans +lifespans.append(telemetry_lifespan) + +# Note: The base_app already uses combine_lifespans(*lifespans) in openhands/server/app.py +# so adding to the lifespans list will automatically include it ``` ### 4.4 License Warning System @@ -599,107 +892,100 @@ export function LicenseWarningBanner() { } ``` -### 4.5 Cronjob Configuration +### 4.5 Environment Configuration -The cronjob configurations will be deployed via the OpenHands-Cloud helm charts. +The telemetry service is configured entirely through environment variables. No Kubernetes CronJobs or separate worker processes are required - the service runs automatically within the main enterprise server process. -#### 4.5.1 Collection Cronjob +#### 4.5.1 Required Environment Variables -The collection cronjob runs weekly to gather metrics: +```bash +# Replicated API configuration (required for upload) +REPLICATED_PUBLISHABLE_KEY= +REPLICATED_APP_SLUG=openhands-enterprise -```yaml -# charts/openhands/templates/telemetry-collection-cronjob.yaml -apiVersion: batch/v1 -kind: CronJob -metadata: - name: {{ include "openhands.fullname" . }}-telemetry-collection - labels: - {{- include "openhands.labels" . | nindent 4 }} -spec: - schedule: "0 2 * * 0" # Weekly on Sunday at 2 AM - jobTemplate: - spec: - template: - spec: - containers: - - name: telemetry-collector - image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" - env: - {{- include "openhands.env" . | nindent 12 }} - command: - - python - - -c - - | - from enterprise.storage.maintenance_task import MaintenanceTask, MaintenanceTaskStatus - from enterprise.storage.database import session_maker - from enterprise.server.telemetry.collection_processor import TelemetryCollectionProcessor +# Optional: Explicit admin email (recommended) +OPENHANDS_ADMIN_EMAIL=admin@company.com - # Create collection task - processor = TelemetryCollectionProcessor() - task = MaintenanceTask() - task.set_processor(processor) - task.status = MaintenanceTaskStatus.PENDING - - with session_maker() as session: - session.add(task) - session.commit() - restartPolicy: OnFailure +# Optional: Custom intervals (defaults shown) +TELEMETRY_COLLECTION_INTERVAL_DAYS=7 +TELEMETRY_UPLOAD_INTERVAL_HOURS=24 +TELEMETRY_WARNING_THRESHOLD_DAYS=4 ``` -#### 4.5.2 Upload Cronjob +#### 4.5.2 Helm Chart Values -The upload cronjob runs daily to send metrics to Replicated: +**File**: `charts/openhands/values.yaml` (additions) ```yaml -# charts/openhands/templates/telemetry-upload-cronjob.yaml -apiVersion: batch/v1 -kind: CronJob +# Telemetry configuration +telemetry: + # Replicated configuration + replicated: + publishableKey: "" # Set via secret or values + appSlug: "openhands-enterprise" + + # Optional admin email + adminEmail: "" + + # Collection and upload intervals + collectionIntervalDays: 7 + uploadIntervalHours: 24 + warningThresholdDays: 4 +``` + +#### 4.5.3 Helm Secret Configuration + +**File**: `charts/openhands/templates/telemetry-secret.yaml` + +```yaml +{{- if .Values.telemetry.replicated.publishableKey }} +apiVersion: v1 +kind: Secret metadata: - name: {{ include "openhands.fullname" . }}-telemetry-upload + name: {{ include "openhands.fullname" . }}-telemetry labels: {{- include "openhands.labels" . | nindent 4 }} -spec: - schedule: "0 3 * * *" # Daily at 3 AM - jobTemplate: - spec: - template: - spec: - containers: - - name: telemetry-uploader - image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" - env: - {{- include "openhands.env" . | nindent 12 }} - - name: REPLICATED_PUBLISHABLE_KEY - valueFrom: - secretKeyRef: - name: {{ include "openhands.fullname" . }}-replicated-config - key: publishable-key - - name: REPLICATED_APP_SLUG - value: {{ .Values.telemetry.replicatedAppSlug | default "openhands-enterprise" | quote }} - command: - - python - - -c - - | - from enterprise.storage.maintenance_task import MaintenanceTask, MaintenanceTaskStatus - from enterprise.storage.database import session_maker - from enterprise.server.telemetry.upload_processor import TelemetryUploadProcessor - import os - - # Create upload task - processor = TelemetryUploadProcessor( - replicated_publishable_key=os.getenv('REPLICATED_PUBLISHABLE_KEY'), - replicated_app_slug=os.getenv('REPLICATED_APP_SLUG', 'openhands-enterprise') - ) - task = MaintenanceTask() - task.set_processor(processor) - task.status = MaintenanceTaskStatus.PENDING - - with session_maker() as session: - session.add(task) - session.commit() - restartPolicy: OnFailure +type: Opaque +stringData: + replicated-publishable-key: {{ .Values.telemetry.replicated.publishableKey }} +{{- end }} ``` +#### 4.5.4 Deployment Environment Variables + +**File**: `charts/openhands/templates/deployment.yaml` (additions to env section) + +```yaml +# Add to existing deployment's container env section +{{- if .Values.telemetry.replicated.publishableKey }} +- name: REPLICATED_PUBLISHABLE_KEY + valueFrom: + secretKeyRef: + name: {{ include "openhands.fullname" . }}-telemetry + key: replicated-publishable-key +- name: REPLICATED_APP_SLUG + value: {{ .Values.telemetry.replicated.appSlug | quote }} +{{- end }} +{{- if .Values.telemetry.adminEmail }} +- name: OPENHANDS_ADMIN_EMAIL + value: {{ .Values.telemetry.adminEmail | quote }} +{{- end }} +{{- if .Values.telemetry.collectionIntervalDays }} +- name: TELEMETRY_COLLECTION_INTERVAL_DAYS + value: {{ .Values.telemetry.collectionIntervalDays | quote }} +{{- end }} +{{- if .Values.telemetry.uploadIntervalHours }} +- name: TELEMETRY_UPLOAD_INTERVAL_HOURS + value: {{ .Values.telemetry.uploadIntervalHours | quote }} +{{- end }} +{{- if .Values.telemetry.warningThresholdDays }} +- name: TELEMETRY_WARNING_THRESHOLD_DAYS + value: {{ .Values.telemetry.warningThresholdDays | quote }} +{{- end }} +``` + +**Note**: Unlike the previous CronJob-based design, this embedded approach requires no separate Kubernetes resources. The telemetry service starts automatically with the main application server and cannot be disabled without modifying the application code itself. + ## 5. Implementation Plan All implementation must pass existing lints and tests. New functionality requires comprehensive unit tests with >90% coverage. Integration tests should verify end-to-end telemetry flow including collection, storage, upload, and warning display. @@ -748,26 +1034,45 @@ Implement the pluggable metrics collection system with registry and base classes **Demo**: Developers can create new collectors with a single file change using the @register_collector decorator. -### 5.3 Collection and Upload Processors (M3) +### 5.3 Embedded Telemetry Service (M3) **Repository**: OpenHands -Implement maintenance task processors for collecting metrics and uploading to Replicated. +Implement the embedded telemetry service that runs within the main enterprise server process using AsyncIO background tasks. -#### 5.3.1 OpenHands - Collection Processor +#### 5.3.1 OpenHands - Telemetry Service -- [ ] `enterprise/server/telemetry/collection_processor.py` -- [ ] `enterprise/tests/unit/telemetry/test_collection_processor.py` +- [ ] `enterprise/server/telemetry/__init__.py` - Package initialization +- [ ] `enterprise/server/telemetry/service.py` - Core TelemetryService singleton class +- [ ] `enterprise/server/telemetry/lifecycle.py` - FastAPI lifespan integration +- [ ] `enterprise/tests/unit/telemetry/test_service.py` - Service unit tests +- [ ] `enterprise/tests/unit/telemetry/test_lifecycle.py` - Lifespan integration tests -#### 5.3.2 OpenHands - Upload Processor +**Key Features**: +- Singleton service pattern with thread-safe initialization +- Independent AsyncIO background tasks for collection (7 days) and upload (24 hours) +- Graceful startup and shutdown via FastAPI lifespan events +- Automatic recovery from errors without crashing main server -- [ ] `enterprise/server/telemetry/upload_processor.py` -- [ ] `enterprise/tests/unit/telemetry/test_upload_processor.py` +#### 5.3.2 OpenHands - Server Integration + +- [ ] Update `enterprise/saas_server.py` to register telemetry lifespan +- [ ] Update `openhands/server/app.py` lifespans list (if needed) +- [ ] `enterprise/tests/integration/test_telemetry_embedded.py` - End-to-end integration tests + +**Integration Points**: +- Add `telemetry_lifespan` to the FastAPI app's lifespan list +- No changes to request handling code required +- Zero overhead on normal operations #### 5.3.3 OpenHands - Integration Tests -- [ ] `enterprise/tests/integration/test_telemetry_flow.py` +- [ ] `enterprise/tests/integration/test_telemetry_flow.py` - Full collection and upload cycle +- [ ] Test startup/shutdown behavior +- [ ] Test interval timing and database state +- [ ] Test Replicated API integration (mocked) +- [ ] Test error handling and recovery -**Demo**: Metrics are automatically collected weekly and uploaded daily to Replicated vendor portal. +**Demo**: Telemetry service starts automatically with the enterprise server, collects metrics weekly, uploads daily to Replicated, and cannot be disabled without code modification. ### 5.4 License Warning API (M4) @@ -802,32 +1107,72 @@ Implement the frontend warning banner component and integration. **Demo**: License warnings appear in UI when telemetry uploads fail for >4 days, with accurate expiration countdown. -### 5.6 Helm Chart Deployment Configuration (M6) +### 5.6 Helm Chart Environment Configuration (M6) **Repository**: OpenHands-Cloud -Create Kubernetes cronjob configurations and deployment scripts. +Configure environment variables and secrets for the embedded telemetry service. No separate Kubernetes resources (CronJobs, workers) are required. -#### 5.6.1 OpenHands-Cloud - Cronjob Manifests +#### 5.6.1 OpenHands-Cloud - Secret Management -- [ ] `charts/openhands/templates/telemetry-collection-cronjob.yaml` -- [ ] `charts/openhands/templates/telemetry-upload-cronjob.yaml` +- [ ] `charts/openhands/templates/telemetry-secret.yaml` - Replicated API key secret +- [ ] Configure secret in deployment to provide `REPLICATED_PUBLISHABLE_KEY` -#### 5.6.2 OpenHands-Cloud - Configuration Management +**Secret Template**: +```yaml +{{- if .Values.telemetry.replicated.publishableKey }} +apiVersion: v1 +kind: Secret +metadata: + name: {{ include "openhands.fullname" . }}-telemetry +type: Opaque +stringData: + replicated-publishable-key: {{ .Values.telemetry.replicated.publishableKey }} +{{- end }} +``` + +#### 5.6.2 OpenHands-Cloud - Values Configuration -- [ ] `charts/openhands/templates/replicated-secret.yaml` - [ ] Update `charts/openhands/values.yaml` with telemetry configuration options: ```yaml # Add to values.yaml telemetry: - enabled: true - replicatedAppSlug: "openhands-enterprise" - adminEmail: "" # Optional: admin email for customer identification - - # Add to deployment environment variables - env: - OPENHANDS_ADMIN_EMAIL: "{{ .Values.telemetry.adminEmail }}" + replicated: + publishableKey: "" # Required for upload (set via secret or sealed secret) + appSlug: "openhands-enterprise" + adminEmail: "" # Optional: explicit admin email + collectionIntervalDays: 7 + uploadIntervalHours: 24 + warningThresholdDays: 4 ``` +#### 5.6.3 OpenHands-Cloud - Deployment Environment Variables + +- [ ] Update `charts/openhands/templates/deployment.yaml` to inject telemetry environment variables: + ```yaml + # Add to deployment container env section + {{- if .Values.telemetry.replicated.publishableKey }} + - name: REPLICATED_PUBLISHABLE_KEY + valueFrom: + secretKeyRef: + name: {{ include "openhands.fullname" . }}-telemetry + key: replicated-publishable-key + - name: REPLICATED_APP_SLUG + value: {{ .Values.telemetry.replicated.appSlug | quote }} + {{- end }} + {{- if .Values.telemetry.adminEmail }} + - name: OPENHANDS_ADMIN_EMAIL + value: {{ .Values.telemetry.adminEmail | quote }} + {{- end }} + - name: TELEMETRY_COLLECTION_INTERVAL_DAYS + value: {{ .Values.telemetry.collectionIntervalDays | default "7" | quote }} + - name: TELEMETRY_UPLOAD_INTERVAL_HOURS + value: {{ .Values.telemetry.uploadIntervalHours | default "24" | quote }} + - name: TELEMETRY_WARNING_THRESHOLD_DAYS + value: {{ .Values.telemetry.warningThresholdDays | default "4" | quote }} + ``` + +**Note**: The telemetry service runs automatically within the main deployment - no CronJobs or additional pods are created. + **Demo**: Complete telemetry system deployed via helm chart with configurable collection intervals and Replicated integration. ### 5.7 Documentation and Enhanced Collectors (M7)