test(enterprise): Add comprehensive test coverage for telemetry service

- Add tests for _collect_metrics method covering success, collector failures, and skip logic
- Add tests for _initial_collection_check for different database states
- Add tests for _upload_pending_metrics covering all edge cases:
  * No Replicated SDK available
  * Missing publishable key
  * No pending metrics
  * No admin email
  * Successful uploads
  * Partial upload failures
- Add tests for _get_or_create_identity edge cases:
  * Replicated SDK failure with UUID fallback
  * No Replicated SDK with UUID generation
- Add comprehensive error handling tests for:
  * _should_collect database errors
  * _should_upload database errors
  * License warning status errors
  * Admin email query errors

These tests exercise critical code paths that weren't previously covered,
including error handling, fallback logic, and edge cases in the two-phase
telemetry service implementation.

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
openhands 2025-12-19 16:59:38 +00:00
parent 0b7610054b
commit 2bbb780001

View File

@ -530,3 +530,479 @@ class TestLifecycleManagement:
await telemetry_service.stop()
assert telemetry_service._shutdown_event.is_set()
class TestCollectMetrics:
"""Test metric collection functionality."""
@pytest.mark.asyncio
@patch('server.telemetry.service.CollectorRegistry')
@patch('server.telemetry.service.session_maker')
async def test_collect_metrics_success(
self, mock_session_maker, mock_registry_class, telemetry_service
):
"""Test successful metrics collection from collectors."""
mock_session = MagicMock()
mock_session.__enter__ = MagicMock(return_value=mock_session)
mock_session.__exit__ = MagicMock(return_value=None)
mock_session_maker.return_value = mock_session
# Mock collector registry and collectors
mock_registry = MagicMock()
mock_registry_class.return_value = mock_registry
mock_collector = MagicMock()
mock_collector.collector_name = 'test_collector'
mock_collector.should_collect.return_value = True
# Mock metric results
mock_result1 = MagicMock()
mock_result1.key = 'metric1'
mock_result1.value = 100
mock_result2 = MagicMock()
mock_result2.key = 'metric2'
mock_result2.value = 200
mock_collector.collect.return_value = [mock_result1, mock_result2]
mock_registry.get_all_collectors.return_value = [mock_collector]
await telemetry_service._collect_metrics()
# Verify session operations
mock_session.add.assert_called_once()
mock_session.commit.assert_called_once()
@pytest.mark.asyncio
@patch('server.telemetry.service.CollectorRegistry')
@patch('server.telemetry.service.session_maker')
async def test_collect_metrics_collector_failure(
self, mock_session_maker, mock_registry_class, telemetry_service
):
"""Test metrics collection when a collector fails."""
mock_session = MagicMock()
mock_session.__enter__ = MagicMock(return_value=mock_session)
mock_session.__exit__ = MagicMock(return_value=None)
mock_session_maker.return_value = mock_session
mock_registry = MagicMock()
mock_registry_class.return_value = mock_registry
# First collector succeeds
mock_collector1 = MagicMock()
mock_collector1.collector_name = 'good_collector'
mock_collector1.should_collect.return_value = True
mock_result = MagicMock()
mock_result.key = 'metric1'
mock_result.value = 100
mock_collector1.collect.return_value = [mock_result]
# Second collector fails
mock_collector2 = MagicMock()
mock_collector2.collector_name = 'bad_collector'
mock_collector2.should_collect.return_value = True
mock_collector2.collect.side_effect = Exception('Collection error')
mock_registry.get_all_collectors.return_value = [
mock_collector1,
mock_collector2,
]
await telemetry_service._collect_metrics()
# Should still store metrics from successful collector
mock_session.add.assert_called_once()
mock_session.commit.assert_called_once()
@pytest.mark.asyncio
@patch('server.telemetry.service.CollectorRegistry')
@patch('server.telemetry.service.session_maker')
async def test_collect_metrics_skip_collector(
self, mock_session_maker, mock_registry_class, telemetry_service
):
"""Test skipping collectors that shouldn't collect."""
mock_session = MagicMock()
mock_session.__enter__ = MagicMock(return_value=mock_session)
mock_session.__exit__ = MagicMock(return_value=None)
mock_session_maker.return_value = mock_session
mock_registry = MagicMock()
mock_registry_class.return_value = mock_registry
mock_collector = MagicMock()
mock_collector.collector_name = 'skip_collector'
mock_collector.should_collect.return_value = False
mock_registry.get_all_collectors.return_value = [mock_collector]
await telemetry_service._collect_metrics()
# Collector.collect() should not be called
mock_collector.collect.assert_not_called()
# Should still commit empty metrics
mock_session.add.assert_called_once()
mock_session.commit.assert_called_once()
@pytest.mark.asyncio
@patch('server.telemetry.service.CollectorRegistry')
@patch('server.telemetry.service.session_maker')
async def test_collect_metrics_database_error(
self, mock_session_maker, mock_registry_class, telemetry_service
):
"""Test handling of database errors during metrics collection."""
mock_session_maker.side_effect = Exception('Database connection error')
# Should not raise exception
await telemetry_service._collect_metrics()
class TestInitialCollectionCheck:
"""Test initial collection check functionality."""
@pytest.mark.asyncio
@patch('server.telemetry.service.session_maker')
async def test_initial_collection_no_metrics(
self, mock_session_maker, telemetry_service
):
"""Test initial collection when no metrics exist."""
mock_session = MagicMock()
mock_session.__enter__ = MagicMock(return_value=mock_session)
mock_session.__exit__ = MagicMock(return_value=None)
mock_session_maker.return_value = mock_session
mock_session.query.return_value.count.return_value = 0
with patch.object(
telemetry_service, '_collect_metrics', new_callable=AsyncMock
) as mock_collect:
await telemetry_service._initial_collection_check()
mock_collect.assert_called_once()
@pytest.mark.asyncio
@patch('server.telemetry.service.session_maker')
async def test_initial_collection_metrics_exist(
self, mock_session_maker, telemetry_service
):
"""Test initial collection when metrics already exist."""
mock_session = MagicMock()
mock_session.__enter__ = MagicMock(return_value=mock_session)
mock_session.__exit__ = MagicMock(return_value=None)
mock_session_maker.return_value = mock_session
mock_session.query.return_value.count.return_value = 5
with patch.object(
telemetry_service, '_collect_metrics', new_callable=AsyncMock
) as mock_collect:
await telemetry_service._initial_collection_check()
mock_collect.assert_not_called()
@pytest.mark.asyncio
@patch('server.telemetry.service.session_maker')
async def test_initial_collection_database_error(
self, mock_session_maker, telemetry_service
):
"""Test error handling during initial collection check."""
mock_session_maker.side_effect = Exception('Database error')
# Should not raise exception
await telemetry_service._initial_collection_check()
class TestUploadPendingMetrics:
"""Test metrics upload functionality."""
@pytest.mark.asyncio
@patch('server.telemetry.service.REPLICATED_AVAILABLE', False)
async def test_upload_no_replicated_sdk(self, telemetry_service):
"""Test upload skipped when Replicated SDK not available."""
with patch.object(
telemetry_service, '_get_admin_email'
) as mock_get_admin_email:
await telemetry_service._upload_pending_metrics()
mock_get_admin_email.assert_not_called()
@pytest.mark.asyncio
@patch('server.telemetry.service.REPLICATED_AVAILABLE', True)
async def test_upload_no_publishable_key(self, telemetry_service):
"""Test upload skipped when publishable key is missing."""
telemetry_service.replicated_publishable_key = None
with patch.object(
telemetry_service, '_get_admin_email'
) as mock_get_admin_email:
await telemetry_service._upload_pending_metrics()
mock_get_admin_email.assert_not_called()
@pytest.mark.asyncio
@patch('server.telemetry.service.REPLICATED_AVAILABLE', True)
@patch('server.telemetry.service.session_maker')
async def test_upload_no_pending_metrics(
self, mock_session_maker, telemetry_service
):
"""Test upload when no pending metrics exist."""
mock_session = MagicMock()
mock_session.__enter__ = MagicMock(return_value=mock_session)
mock_session.__exit__ = MagicMock(return_value=None)
mock_session_maker.return_value = mock_session
mock_session.query.return_value.filter.return_value.order_by.return_value.all.return_value = (
[]
)
await telemetry_service._upload_pending_metrics()
# Should not attempt to get admin email
mock_session.commit.assert_not_called()
@pytest.mark.asyncio
@patch('server.telemetry.service.REPLICATED_AVAILABLE', True)
@patch('server.telemetry.service.session_maker')
async def test_upload_no_admin_email(self, mock_session_maker, telemetry_service):
"""Test upload skipped when no admin email available."""
mock_session = MagicMock()
mock_session.__enter__ = MagicMock(return_value=mock_session)
mock_session.__exit__ = MagicMock(return_value=None)
mock_session_maker.return_value = mock_session
mock_metric = MagicMock()
mock_session.query.return_value.filter.return_value.order_by.return_value.all.return_value = [
mock_metric
]
with patch.object(
telemetry_service, '_get_admin_email', return_value=None
) as mock_get_admin:
await telemetry_service._upload_pending_metrics()
mock_get_admin.assert_called_once()
@pytest.mark.asyncio
@patch('server.telemetry.service.REPLICATED_AVAILABLE', True)
@patch('server.telemetry.service.ReplicatedClient')
@patch('server.telemetry.service.InstanceStatus')
@patch('server.telemetry.service.session_maker')
async def test_upload_success(
self,
mock_session_maker,
mock_status,
mock_client_class,
telemetry_service,
):
"""Test successful metrics upload."""
mock_session = MagicMock()
mock_session.__enter__ = MagicMock(return_value=mock_session)
mock_session.__exit__ = MagicMock(return_value=None)
mock_session_maker.return_value = mock_session
# Mock pending metrics
mock_metric = MagicMock()
mock_metric.id = 1
mock_metric.metrics_data = {'metric1': 100, 'metric2': 200}
mock_metric.upload_attempts = 0
mock_session.query.return_value.filter.return_value.order_by.return_value.all.return_value = [
mock_metric
]
# Mock Replicated client
mock_instance = MagicMock()
mock_instance.instance_id = 'inst-456'
mock_customer = MagicMock()
mock_customer.customer_id = 'cust-123'
mock_customer.get_or_create_instance.return_value = mock_instance
mock_client = MagicMock()
mock_client.customer.get_or_create.return_value = mock_customer
mock_client_class.return_value = mock_client
# Mock identity
mock_identity = MagicMock()
with patch.object(
telemetry_service, '_get_admin_email', return_value='admin@example.com'
):
with patch.object(
telemetry_service,
'_get_or_create_identity',
return_value=mock_identity,
):
await telemetry_service._upload_pending_metrics()
# Verify metrics were sent
assert mock_instance.send_metric.call_count == 2
assert mock_metric.uploaded_at is not None
assert mock_metric.upload_attempts == 1
assert mock_metric.last_upload_error is None
@pytest.mark.asyncio
@patch('server.telemetry.service.REPLICATED_AVAILABLE', True)
@patch('server.telemetry.service.ReplicatedClient')
@patch('server.telemetry.service.InstanceStatus')
@patch('server.telemetry.service.session_maker')
async def test_upload_partial_failure(
self,
mock_session_maker,
mock_status,
mock_client_class,
telemetry_service,
):
"""Test upload when some metrics fail to upload."""
mock_session = MagicMock()
mock_session.__enter__ = MagicMock(return_value=mock_session)
mock_session.__exit__ = MagicMock(return_value=None)
mock_session_maker.return_value = mock_session
# Mock pending metrics
mock_metric1 = MagicMock()
mock_metric1.id = 1
mock_metric1.metrics_data = {'metric1': 100}
mock_metric1.upload_attempts = 0
mock_metric1.uploaded_at = None
mock_metric1.last_upload_error = None
mock_metric2 = MagicMock()
mock_metric2.id = 2
mock_metric2.metrics_data = {'metric2': 200}
mock_metric2.upload_attempts = 0
mock_metric2.uploaded_at = None
mock_metric2.last_upload_error = None
mock_session.query.return_value.filter.return_value.order_by.return_value.all.return_value = [
mock_metric1,
mock_metric2,
]
# Mock Replicated client
mock_instance = MagicMock()
mock_instance.instance_id = 'inst-456'
# First metric succeeds, second fails
call_count = [0]
def side_effect_send(*args):
call_count[0] += 1
if call_count[0] > 1:
raise Exception('Upload error')
mock_instance.send_metric.side_effect = side_effect_send
mock_customer = MagicMock()
mock_customer.customer_id = 'cust-123'
mock_customer.get_or_create_instance.return_value = mock_instance
mock_client = MagicMock()
mock_client.customer.get_or_create.return_value = mock_customer
mock_client_class.return_value = mock_client
mock_identity = MagicMock()
with patch.object(
telemetry_service, '_get_admin_email', return_value='admin@example.com'
):
with patch.object(
telemetry_service,
'_get_or_create_identity',
return_value=mock_identity,
):
await telemetry_service._upload_pending_metrics()
# First metric should be marked as uploaded
assert mock_metric1.uploaded_at is not None
# Second metric should have error recorded
assert mock_metric2.last_upload_error is not None
assert mock_metric2.uploaded_at is None
class TestGetOrCreateIdentityEdgeCases:
"""Test edge cases in identity creation."""
@patch('server.telemetry.service.REPLICATED_AVAILABLE', True)
@patch('server.telemetry.service.ReplicatedClient')
def test_identity_replicated_failure_fallback(
self, mock_client_class, telemetry_service, mock_session
):
"""Test fallback to UUID when Replicated SDK fails."""
mock_session.query.return_value.filter.return_value.first.return_value = None
# Mock Replicated to raise exception
mock_client_class.side_effect = Exception('Replicated API error')
with patch('server.telemetry.service.TelemetryIdentity') as mock_identity_class:
mock_identity = MagicMock()
mock_identity.customer_id = None
mock_identity.instance_id = None
mock_identity_class.return_value = mock_identity
identity = telemetry_service._get_or_create_identity(
mock_session, 'test@example.com'
)
# Should have fallback UUID
assert identity.instance_id is not None
# Customer ID should be set to email
assert identity.customer_id == 'test@example.com'
@patch('server.telemetry.service.REPLICATED_AVAILABLE', False)
def test_identity_no_replicated_uses_uuid(self, telemetry_service, mock_session):
"""Test UUID generation when Replicated SDK not available."""
mock_session.query.return_value.filter.return_value.first.return_value = None
with patch('server.telemetry.service.TelemetryIdentity') as mock_identity_class:
mock_identity = MagicMock()
mock_identity.customer_id = None
mock_identity.instance_id = None
mock_identity_class.return_value = mock_identity
identity = telemetry_service._get_or_create_identity(
mock_session, 'test@example.com'
)
# Should have UUID instance_id
assert identity.instance_id is not None
assert identity.customer_id == 'test@example.com'
class TestErrorHandling:
"""Test error handling in various scenarios."""
@patch('server.telemetry.service.session_maker')
def test_should_collect_error_handling(
self, mock_session_maker, telemetry_service
):
"""Test error handling in _should_collect."""
mock_session_maker.side_effect = Exception('Database error')
result = telemetry_service._should_collect()
assert result is False
@patch('server.telemetry.service.session_maker')
def test_should_upload_error_handling(
self, mock_session_maker, telemetry_service
):
"""Test error handling in _should_upload."""
mock_session_maker.side_effect = Exception('Database error')
result = telemetry_service._should_upload()
assert result is False
@patch('server.telemetry.service.session_maker')
def test_license_warning_status_error(
self, mock_session_maker, telemetry_service
):
"""Test error handling in get_license_warning_status."""
mock_session_maker.side_effect = Exception('Database error')
status = telemetry_service.get_license_warning_status()
assert status['should_warn'] is False
assert status['days_since_upload'] is None
assert 'Error' in status['message']
@patch('server.telemetry.service.os.getenv')
def test_admin_email_query_error(
self, mock_getenv, telemetry_service, mock_session
):
"""Test error handling when user query fails."""
mock_getenv.return_value = None
mock_session.query.side_effect = Exception('Query error')
email = telemetry_service._get_admin_email(mock_session)
assert email is None