mirror of
https://github.com/OpenHands/OpenHands.git
synced 2025-12-26 05:48:36 +08:00
fix(enterprise): Remove trailing whitespace from telemetry design doc
Run enterprise linter to fix trailing whitespace issues that were causing CI lint checks to fail. Changes: - Removed trailing whitespace from empty lines throughout the document - No content changes, only whitespace cleanup Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
parent
b3bf509634
commit
653be34c88
@ -123,7 +123,7 @@ Replicated uses a **publishable key** model (similar to Stripe and other modern
|
||||
- **Read-Only Access**: Cannot access other customers' data or modify vendor account settings
|
||||
- **Shared Across Deployments**: The same key is used for all customer installations of your application
|
||||
- **Not Customer-Specific**: Unlike license keys, the publishable key identifies your vendor account, not individual customers
|
||||
|
||||
|
||||
- **Customer Identification**: Individual customers are identified by their email address or instance fingerprint, not by different API keys
|
||||
|
||||
This security model allows vendors to embed telemetry collection directly in their application without exposing sensitive credentials. The key should be stored in environment variables for configurability, but can be safely committed to source code if needed.
|
||||
@ -348,59 +348,59 @@ from telemetry.registry import CollectorRegistry
|
||||
|
||||
class TelemetryService:
|
||||
"""Singleton service for managing embedded telemetry collection and upload.
|
||||
|
||||
|
||||
This service runs as part of the main enterprise server process using AsyncIO
|
||||
background tasks. It starts automatically during FastAPI application startup
|
||||
and runs independently without requiring external CronJobs or maintenance workers.
|
||||
|
||||
|
||||
Two-Phase Scheduling:
|
||||
---------------------
|
||||
The service uses adaptive scheduling to minimize time-to-visibility for new installations:
|
||||
|
||||
|
||||
Phase 1 (Bootstrap - No Identity Established):
|
||||
- Runs when no user has authenticated yet (no admin email available)
|
||||
- Checks every 3 minutes for first user authentication
|
||||
- Immediately collects and uploads metrics once first user authenticates
|
||||
- Creates Replicated customer and instance identity on first upload
|
||||
|
||||
|
||||
Phase 2 (Normal Operations - Identity Established):
|
||||
- Runs after identity (customer_id + instance_id) is created
|
||||
- Checks every hour (reduced overhead)
|
||||
- Collects metrics every 7 days
|
||||
- Uploads metrics every 24 hours
|
||||
|
||||
|
||||
This ensures new installations become visible to the vendor within minutes of first use,
|
||||
while established installations maintain low resource overhead.
|
||||
"""
|
||||
|
||||
|
||||
_instance: Optional['TelemetryService'] = None
|
||||
_initialized: bool = False
|
||||
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
return cls._instance
|
||||
|
||||
|
||||
def __init__(self):
|
||||
if self._initialized:
|
||||
return
|
||||
|
||||
|
||||
self._initialized = True
|
||||
self._collection_task: Optional[asyncio.Task] = None
|
||||
self._upload_task: Optional[asyncio.Task] = None
|
||||
self._shutdown_event = asyncio.Event()
|
||||
|
||||
|
||||
# Configuration (from environment or defaults)
|
||||
self.collection_interval_days = int(os.getenv('TELEMETRY_COLLECTION_INTERVAL_DAYS', '7'))
|
||||
self.upload_interval_hours = int(os.getenv('TELEMETRY_UPLOAD_INTERVAL_HOURS', '24'))
|
||||
self.license_warning_threshold_days = int(os.getenv('TELEMETRY_WARNING_THRESHOLD_DAYS', '4'))
|
||||
|
||||
|
||||
# Two-phase scheduling: Before identity is established, check more frequently
|
||||
# Phase 1 (no identity): Check every 3 minutes for first user authentication
|
||||
# Phase 2 (identity exists): Check every hour for normal operations
|
||||
self.bootstrap_check_interval_seconds = 180 # 3 minutes
|
||||
self.normal_check_interval_seconds = 3600 # 1 hour
|
||||
|
||||
|
||||
# Replicated API configuration - HARDCODED for security through obscurity
|
||||
# This publishable key (replicated_pk_...) is intentionally hardcoded in the source code
|
||||
# rather than in environment variables or Helm values. This makes the telemetry system:
|
||||
@ -415,36 +415,36 @@ class TelemetryService:
|
||||
# - This is the same security model as Stripe's frontend publishable keys
|
||||
self.replicated_publishable_key = "replicated_pk_xxxxxxxxxxxxxxxxxxxxxxxxxx" # TODO: Replace with actual key
|
||||
self.replicated_app_slug = "openhands-enterprise"
|
||||
|
||||
|
||||
logger.info("TelemetryService initialized")
|
||||
|
||||
|
||||
async def start(self):
|
||||
"""Start the telemetry service background tasks.
|
||||
|
||||
|
||||
Called automatically during FastAPI application startup via lifespan events.
|
||||
"""
|
||||
if self._collection_task is not None or self._upload_task is not None:
|
||||
logger.warning("TelemetryService already started")
|
||||
return
|
||||
|
||||
|
||||
logger.info("Starting TelemetryService background tasks")
|
||||
|
||||
|
||||
# Start independent background loops
|
||||
self._collection_task = asyncio.create_task(self._collection_loop())
|
||||
self._upload_task = asyncio.create_task(self._upload_loop())
|
||||
|
||||
|
||||
# Run initial collection if needed (don't wait for 7-day interval)
|
||||
asyncio.create_task(self._initial_collection_check())
|
||||
|
||||
|
||||
async def stop(self):
|
||||
"""Stop the telemetry service and perform cleanup.
|
||||
|
||||
|
||||
Called automatically during FastAPI application shutdown via lifespan events.
|
||||
"""
|
||||
logger.info("Stopping TelemetryService")
|
||||
|
||||
|
||||
self._shutdown_event.set()
|
||||
|
||||
|
||||
# Cancel background tasks
|
||||
if self._collection_task:
|
||||
self._collection_task.cancel()
|
||||
@ -452,47 +452,47 @@ class TelemetryService:
|
||||
await self._collection_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
|
||||
if self._upload_task:
|
||||
self._upload_task.cancel()
|
||||
try:
|
||||
await self._upload_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
|
||||
logger.info("TelemetryService stopped")
|
||||
|
||||
|
||||
async def _collection_loop(self):
|
||||
"""Background task that checks if metrics collection is needed.
|
||||
|
||||
|
||||
Uses two-phase scheduling:
|
||||
- Phase 1 (bootstrap): Checks every 3 minutes until identity is established
|
||||
- Phase 2 (normal): Checks every hour, collects every 7 days
|
||||
|
||||
|
||||
This ensures rapid first collection after user authentication while maintaining
|
||||
low overhead for ongoing operations.
|
||||
"""
|
||||
logger.info(f"Collection loop started (interval: {self.collection_interval_days} days)")
|
||||
|
||||
|
||||
while not self._shutdown_event.is_set():
|
||||
try:
|
||||
# Determine check interval based on whether identity is established
|
||||
identity_established = self._is_identity_established()
|
||||
check_interval = (self.normal_check_interval_seconds if identity_established
|
||||
check_interval = (self.normal_check_interval_seconds if identity_established
|
||||
else self.bootstrap_check_interval_seconds)
|
||||
|
||||
|
||||
if not identity_established:
|
||||
logger.debug("Identity not yet established, using bootstrap interval (3 minutes)")
|
||||
|
||||
|
||||
# Check if collection is needed
|
||||
if self._should_collect():
|
||||
logger.info("Starting metrics collection")
|
||||
await self._collect_metrics()
|
||||
logger.info("Metrics collection completed")
|
||||
|
||||
|
||||
# Sleep until next check (interval depends on phase)
|
||||
await asyncio.sleep(check_interval)
|
||||
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Collection loop cancelled")
|
||||
break
|
||||
@ -501,29 +501,29 @@ class TelemetryService:
|
||||
# Continue running even if collection fails
|
||||
# Use shorter interval on error to retry sooner
|
||||
await asyncio.sleep(self.bootstrap_check_interval_seconds)
|
||||
|
||||
|
||||
async def _upload_loop(self):
|
||||
"""Background task that checks if metrics upload is needed.
|
||||
|
||||
|
||||
Uses two-phase scheduling:
|
||||
- Phase 1 (bootstrap): Checks every 3 minutes for first user, uploads immediately
|
||||
- Phase 2 (normal): Checks every hour, uploads every 24 hours
|
||||
|
||||
|
||||
When identity is first established, triggers immediate upload to minimize time
|
||||
until vendor visibility. After that, follows normal 24-hour upload schedule.
|
||||
"""
|
||||
logger.info(f"Upload loop started (interval: {self.upload_interval_hours} hours)")
|
||||
|
||||
|
||||
while not self._shutdown_event.is_set():
|
||||
try:
|
||||
# Determine check interval based on whether identity is established
|
||||
identity_established = self._is_identity_established()
|
||||
check_interval = (self.normal_check_interval_seconds if identity_established
|
||||
check_interval = (self.normal_check_interval_seconds if identity_established
|
||||
else self.bootstrap_check_interval_seconds)
|
||||
|
||||
|
||||
if not identity_established:
|
||||
logger.debug("Identity not yet established, using bootstrap interval (3 minutes)")
|
||||
|
||||
|
||||
# Check if upload is needed
|
||||
# In bootstrap phase, attempt upload whenever there are pending metrics
|
||||
# (upload will be skipped internally if no admin email available)
|
||||
@ -531,17 +531,17 @@ class TelemetryService:
|
||||
logger.info("Starting metrics upload")
|
||||
was_established_before = identity_established
|
||||
await self._upload_pending_metrics()
|
||||
|
||||
|
||||
# If identity was just established, it will be created during upload
|
||||
# Continue with short interval for one more cycle to ensure upload succeeds
|
||||
if not was_established_before and self._is_identity_established():
|
||||
logger.info("Identity just established - first upload completed")
|
||||
|
||||
|
||||
logger.info("Metrics upload completed")
|
||||
|
||||
|
||||
# Sleep until next check (interval depends on phase)
|
||||
await asyncio.sleep(check_interval)
|
||||
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Upload loop cancelled")
|
||||
break
|
||||
@ -550,7 +550,7 @@ class TelemetryService:
|
||||
# Continue running even if upload fails
|
||||
# Use shorter interval on error to retry sooner
|
||||
await asyncio.sleep(self.bootstrap_check_interval_seconds)
|
||||
|
||||
|
||||
async def _initial_collection_check(self):
|
||||
"""Check on startup if initial collection is needed."""
|
||||
try:
|
||||
@ -561,10 +561,10 @@ class TelemetryService:
|
||||
await self._collect_metrics()
|
||||
except Exception as e:
|
||||
logger.error(f"Error during initial collection check: {e}")
|
||||
|
||||
|
||||
def _is_identity_established(self) -> bool:
|
||||
"""Check if telemetry identity has been established.
|
||||
|
||||
|
||||
Returns True if we have both customer_id and instance_id in the database,
|
||||
indicating that at least one user has authenticated and we can send telemetry.
|
||||
"""
|
||||
@ -573,15 +573,15 @@ class TelemetryService:
|
||||
identity = session.query(TelemetryIdentity).filter(
|
||||
TelemetryIdentity.id == 1
|
||||
).first()
|
||||
|
||||
|
||||
# Identity is established if we have both customer_id and instance_id
|
||||
return (identity is not None and
|
||||
identity.customer_id is not None and
|
||||
return (identity is not None and
|
||||
identity.customer_id is not None and
|
||||
identity.instance_id is not None)
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking identity status: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def _should_collect(self) -> bool:
|
||||
"""Check if 7 days have passed since last collection."""
|
||||
try:
|
||||
@ -591,16 +591,16 @@ class TelemetryService:
|
||||
.order_by(TelemetryMetrics.collected_at.desc())
|
||||
.first()
|
||||
)
|
||||
|
||||
|
||||
if not last_metric:
|
||||
return True # First collection
|
||||
|
||||
|
||||
days_since = (datetime.now(timezone.utc) - last_metric.collected_at).days
|
||||
return days_since >= self.collection_interval_days
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking collection status: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def _should_upload(self) -> bool:
|
||||
"""Check if 24 hours have passed since last upload."""
|
||||
try:
|
||||
@ -611,31 +611,31 @@ class TelemetryService:
|
||||
.order_by(TelemetryMetrics.uploaded_at.desc())
|
||||
.first()
|
||||
)
|
||||
|
||||
|
||||
if not last_uploaded:
|
||||
# Check if we have any pending metrics to upload
|
||||
pending_count = session.query(TelemetryMetrics).filter(
|
||||
TelemetryMetrics.uploaded_at.is_(None)
|
||||
).count()
|
||||
return pending_count > 0
|
||||
|
||||
|
||||
hours_since = (datetime.now(timezone.utc) - last_uploaded.uploaded_at).total_seconds() / 3600
|
||||
return hours_since >= self.upload_interval_hours
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking upload status: {e}")
|
||||
return False
|
||||
|
||||
|
||||
async def _collect_metrics(self):
|
||||
"""Collect metrics from all registered collectors and store in database."""
|
||||
try:
|
||||
# Get all registered collectors
|
||||
registry = CollectorRegistry()
|
||||
collectors = registry.get_all_collectors()
|
||||
|
||||
|
||||
# Collect metrics from each collector
|
||||
all_metrics = {}
|
||||
collector_results = {}
|
||||
|
||||
|
||||
for collector in collectors:
|
||||
try:
|
||||
if collector.should_collect():
|
||||
@ -647,7 +647,7 @@ class TelemetryService:
|
||||
except Exception as e:
|
||||
logger.error(f"Collector {collector.collector_name} failed: {e}", exc_info=True)
|
||||
collector_results[collector.collector_name] = f"error: {str(e)}"
|
||||
|
||||
|
||||
# Store metrics in database
|
||||
with session_maker() as session:
|
||||
telemetry_record = TelemetryMetrics(
|
||||
@ -656,21 +656,21 @@ class TelemetryService:
|
||||
)
|
||||
session.add(telemetry_record)
|
||||
session.commit()
|
||||
|
||||
|
||||
logger.info(f"Stored {len(all_metrics)} metrics in database")
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error during metrics collection: {e}", exc_info=True)
|
||||
|
||||
|
||||
async def _upload_pending_metrics(self):
|
||||
"""Upload pending metrics to Replicated."""
|
||||
if not self.replicated_publishable_key:
|
||||
logger.warning("REPLICATED_PUBLISHABLE_KEY not set, skipping upload")
|
||||
return
|
||||
|
||||
|
||||
try:
|
||||
from replicated import ReplicatedClient, InstanceStatus
|
||||
|
||||
|
||||
# Get pending metrics
|
||||
with session_maker() as session:
|
||||
pending_metrics = (
|
||||
@ -679,20 +679,20 @@ class TelemetryService:
|
||||
.order_by(TelemetryMetrics.collected_at)
|
||||
.all()
|
||||
)
|
||||
|
||||
|
||||
if not pending_metrics:
|
||||
logger.info("No pending metrics to upload")
|
||||
return
|
||||
|
||||
|
||||
# Get admin email - skip if not available
|
||||
admin_email = self._get_admin_email(session)
|
||||
if not admin_email:
|
||||
logger.warning("No admin email available, skipping upload")
|
||||
return
|
||||
|
||||
|
||||
# Get or create identity
|
||||
identity = self._get_or_create_identity(session, admin_email)
|
||||
|
||||
|
||||
# Initialize Replicated client with publishable key
|
||||
# This publishable key is intentionally embedded in the application and shared
|
||||
# across all customer deployments. It's safe to use here because:
|
||||
@ -703,47 +703,47 @@ class TelemetryService:
|
||||
publishable_key=self.replicated_publishable_key,
|
||||
app_slug=self.replicated_app_slug
|
||||
)
|
||||
|
||||
|
||||
# Upload each pending metric
|
||||
successful_count = 0
|
||||
# Get or create customer and instance
|
||||
customer = client.customer.get_or_create(email_address=admin_email)
|
||||
instance = customer.get_or_create_instance()
|
||||
|
||||
|
||||
# Update identity with Replicated IDs
|
||||
identity.customer_id = customer.customer_id
|
||||
identity.instance_id = instance.instance_id
|
||||
session.commit()
|
||||
|
||||
|
||||
# Upload each pending metric
|
||||
for metric in pending_metrics:
|
||||
try:
|
||||
# Send individual metrics
|
||||
for key, value in metric.metrics_data.items():
|
||||
instance.send_metric(key, value)
|
||||
|
||||
|
||||
# Update instance status
|
||||
instance.set_status(InstanceStatus.RUNNING)
|
||||
|
||||
|
||||
# Mark as uploaded
|
||||
metric.uploaded_at = datetime.now(timezone.utc)
|
||||
metric.upload_attempts += 1
|
||||
metric.last_upload_error = None
|
||||
successful_count += 1
|
||||
|
||||
|
||||
logger.info(f"Uploaded metric {metric.id} to Replicated")
|
||||
|
||||
|
||||
except Exception as e:
|
||||
metric.upload_attempts += 1
|
||||
metric.last_upload_error = str(e)
|
||||
logger.error(f"Error uploading metric {metric.id}: {e}")
|
||||
|
||||
|
||||
session.commit()
|
||||
logger.info(f"Successfully uploaded {successful_count}/{len(pending_metrics)} metrics")
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error during metrics upload: {e}", exc_info=True)
|
||||
|
||||
|
||||
def _get_admin_email(self, session) -> Optional[str]:
|
||||
"""Determine admin email from environment or database."""
|
||||
# Try environment variable first
|
||||
@ -751,7 +751,7 @@ class TelemetryService:
|
||||
if admin_email:
|
||||
logger.info("Using admin email from OPENHANDS_ADMIN_EMAIL environment variable")
|
||||
return admin_email
|
||||
|
||||
|
||||
# Try first user who accepted ToS
|
||||
try:
|
||||
first_user = (
|
||||
@ -766,23 +766,23 @@ class TelemetryService:
|
||||
return first_user.email
|
||||
except Exception as e:
|
||||
logger.error(f"Error determining admin email: {e}")
|
||||
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _get_or_create_identity(self, session, admin_email: str) -> TelemetryIdentity:
|
||||
"""Get or create telemetry identity with customer and instance IDs."""
|
||||
identity = session.query(TelemetryIdentity).filter(
|
||||
TelemetryIdentity.id == 1
|
||||
).first()
|
||||
|
||||
|
||||
if not identity:
|
||||
identity = TelemetryIdentity(id=1)
|
||||
session.add(identity)
|
||||
|
||||
|
||||
# Set customer_id to admin email if not already set
|
||||
if not identity.customer_id:
|
||||
identity.customer_id = admin_email
|
||||
|
||||
|
||||
# Generate instance_id using Replicated SDK if not set
|
||||
if not identity.instance_id:
|
||||
try:
|
||||
@ -800,13 +800,13 @@ class TelemetryService:
|
||||
# Generate a fallback UUID if Replicated SDK fails
|
||||
import uuid
|
||||
identity.instance_id = str(uuid.uuid4())
|
||||
|
||||
|
||||
session.commit()
|
||||
return identity
|
||||
|
||||
|
||||
def get_license_warning_status(self) -> dict:
|
||||
"""Get current license warning status for UI display.
|
||||
|
||||
|
||||
Returns:
|
||||
dict with 'should_warn', 'days_since_upload', and 'message' keys
|
||||
"""
|
||||
@ -818,20 +818,20 @@ class TelemetryService:
|
||||
.order_by(TelemetryMetrics.uploaded_at.desc())
|
||||
.first()
|
||||
)
|
||||
|
||||
|
||||
if not last_uploaded:
|
||||
return {
|
||||
'should_warn': False,
|
||||
'days_since_upload': None,
|
||||
'message': 'No uploads yet'
|
||||
}
|
||||
|
||||
|
||||
days_since_upload = (
|
||||
datetime.now(timezone.utc) - last_uploaded.uploaded_at
|
||||
).days
|
||||
|
||||
|
||||
should_warn = days_since_upload > self.license_warning_threshold_days
|
||||
|
||||
|
||||
return {
|
||||
'should_warn': should_warn,
|
||||
'days_since_upload': days_since_upload,
|
||||
@ -869,15 +869,15 @@ from server.telemetry.service import telemetry_service
|
||||
@asynccontextmanager
|
||||
async def telemetry_lifespan(app: FastAPI) -> AsyncIterator[None]:
|
||||
"""FastAPI lifespan context manager for telemetry service.
|
||||
|
||||
|
||||
This is called automatically during FastAPI application startup and shutdown,
|
||||
managing the lifecycle of the telemetry background tasks.
|
||||
|
||||
|
||||
Startup: Initializes and starts background collection and upload tasks
|
||||
Shutdown: Gracefully stops background tasks
|
||||
"""
|
||||
logger.info("Starting telemetry service lifespan")
|
||||
|
||||
|
||||
# Startup - start background tasks
|
||||
try:
|
||||
await telemetry_service.start()
|
||||
@ -885,9 +885,9 @@ async def telemetry_lifespan(app: FastAPI) -> AsyncIterator[None]:
|
||||
except Exception as e:
|
||||
logger.error(f"Error starting telemetry service: {e}", exc_info=True)
|
||||
# Don't fail server startup if telemetry fails
|
||||
|
||||
|
||||
yield # Server runs here
|
||||
|
||||
|
||||
# Shutdown - stop background tasks
|
||||
try:
|
||||
await telemetry_service.stop()
|
||||
@ -1058,7 +1058,7 @@ TELEMETRY_WARNING_THRESHOLD_DAYS=4
|
||||
telemetry:
|
||||
# Optional admin email for telemetry identification
|
||||
adminEmail: ""
|
||||
|
||||
|
||||
# Collection and upload intervals
|
||||
collectionIntervalDays: 7
|
||||
uploadIntervalHours: 24
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user