From 6eb32e9ae4f3d2ade2ce07fd258e05372ee54dd7 Mon Sep 17 00:00:00 2001 From: Rohit Malhotra Date: Fri, 29 Aug 2025 16:15:44 -0400 Subject: [PATCH] Fix: Add method to merge conversation stats (#10667) Co-authored-by: openhands --- .../server/services/conversation_stats.py | 57 +++++++++ tests/unit/test_conversation_stats.py | 111 ++++++++++++++++++ 2 files changed, 168 insertions(+) diff --git a/openhands/server/services/conversation_stats.py b/openhands/server/services/conversation_stats.py index ff72681db9..9f54f4a75d 100644 --- a/openhands/server/services/conversation_stats.py +++ b/openhands/server/services/conversation_stats.py @@ -101,3 +101,60 @@ class ConversationStats: del self.restored_metrics[service_id] self.service_to_metrics[service_id] = llm.metrics + + def merge_and_save(self, conversation_stats: 'ConversationStats'): + """ + Merge restored metrics from another ConversationStats into this one. + + Important: + - This method is intended to be used immediately after restoring metrics from + storage, before any LLM services are registered. In that state, only + `restored_metrics` should contain entries and `service_to_metrics` should + be empty. If either side has entries in `service_to_metrics`, we log an + error but continue execution. + + Behavior: + - Drop entries with zero accumulated_cost from both `restored_metrics` dicts + (self and incoming) before merging. + - Merge only `restored_metrics`. For duplicate keys, the incoming + `conversation_stats.restored_metrics` overwrites existing entries. + - Do NOT merge `service_to_metrics` here. + - Persist results by calling save_metrics(). + """ + + # If either side has active service metrics, log an error but proceed + if self.service_to_metrics or conversation_stats.service_to_metrics: + logger.error( + 'merge_and_save should be used only when service_to_metrics are empty; ' + 'found active service metrics during merge. Proceeding anyway.', + extra={ + 'conversation_id': self.conversation_id, + 'self_service_to_metrics_keys': list( + self.service_to_metrics.keys() + ), + 'incoming_service_to_metrics_keys': list( + conversation_stats.service_to_metrics.keys() + ), + }, + ) + + # Drop zero-cost entries from restored metrics only + def _drop_zero_cost(d: dict[str, Metrics]) -> None: + to_delete = [ + k for k, v in d.items() if getattr(v, 'accumulated_cost', 0) == 0 + ] + for k in to_delete: + del d[k] + + _drop_zero_cost(self.restored_metrics) + _drop_zero_cost(conversation_stats.restored_metrics) + + # Merge restored metrics, allowing incoming to overwrite + self.restored_metrics.update(conversation_stats.restored_metrics) + + # Save merged state + self.save_metrics() + logger.info( + 'Merged conversation stats', + extra={'conversation_id': self.conversation_id}, + ) diff --git a/tests/unit/test_conversation_stats.py b/tests/unit/test_conversation_stats.py index 59b656684d..ca6173087e 100644 --- a/tests/unit/test_conversation_stats.py +++ b/tests/unit/test_conversation_stats.py @@ -490,6 +490,117 @@ def test_save_and_restore_workflow(mock_file_store): assert llm.metrics.accumulated_token_usage.completion_tokens == 50 +def test_merge_conversation_stats_success_non_overlapping(mock_file_store): + """Merging two ConversationStats combines only restored metrics. Active metrics + (service_to_metrics) are not merged; if present, an error is logged but + execution continues. Incoming restored metrics overwrite duplicates. + """ + stats_a = ConversationStats( + file_store=mock_file_store, conversation_id='conv-merge-a', user_id='user-x' + ) + stats_b = ConversationStats( + file_store=mock_file_store, conversation_id='conv-merge-b', user_id='user-x' + ) + + # Self active + restored + m_a_active = Metrics(model_name='model-a') + m_a_active.add_cost(0.1) + m_a_restored = Metrics(model_name='model-a') + m_a_restored.add_cost(0.2) + stats_a.service_to_metrics['a-active'] = m_a_active + stats_a.restored_metrics = {'a-restored': m_a_restored} + + # Other active + restored + m_b_active = Metrics(model_name='model-b') + m_b_active.add_cost(0.3) + m_b_restored = Metrics(model_name='model-b') + m_b_restored.add_cost(0.4) + stats_b.service_to_metrics['b-active'] = m_b_active + stats_b.restored_metrics = {'b-restored': m_b_restored} + + # Merge B into A + stats_a.merge_and_save(stats_b) + + # Active metrics should not be merged; only self's active metrics remain + assert set(stats_a.service_to_metrics.keys()) == { + 'a-active', + } + + # Restored metrics from both sides should be in A's restored_metrics + assert set(stats_a.restored_metrics.keys()) == { + 'a-restored', + 'b-restored', + } + + # The exact Metrics objects should be present (no copies) + assert stats_a.service_to_metrics['a-active'] is m_a_active + assert stats_a.restored_metrics['a-restored'] is m_a_restored + assert stats_a.restored_metrics['b-restored'] is m_b_restored + + # Merge triggers a save; confirm the saved blob decodes to expected keys + # The save_metrics method combines both service_to_metrics and restored_metrics + encoded = mock_file_store.read(stats_a.metrics_path) + pickled = base64.b64decode(encoded) + restored_dict = pickle.loads(pickled) + assert set(restored_dict.keys()) == { + 'a-active', + 'a-restored', + 'b-restored', + } + + +@pytest.mark.parametrize( + 'self_side,other_side', + [ + ('active', 'active'), + ('restored', 'active'), + ('active', 'restored'), + ('restored', 'restored'), + ], +) +def test_merge_conversation_stats_duplicates_overwrite_and_log_errors( + mock_file_store, self_side, other_side +): + stats_a = ConversationStats( + file_store=mock_file_store, conversation_id='conv-merge-a', user_id='user-x' + ) + stats_b = ConversationStats( + file_store=mock_file_store, conversation_id='conv-merge-b', user_id='user-x' + ) + + # Place the same service id on the specified sides + dupe_id = 'dupe-service' + m1 = Metrics(model_name='m') + m1.add_cost(0.1) # ensure not dropped + m2 = Metrics(model_name='m') + m2.add_cost(0.2) # ensure not dropped + + if self_side == 'active': + stats_a.service_to_metrics[dupe_id] = m1 + else: + stats_a.restored_metrics[dupe_id] = m1 + + if other_side == 'active': + stats_b.service_to_metrics[dupe_id] = m2 + else: + stats_b.restored_metrics[dupe_id] = m2 + + # Perform merge; should not raise and should log error internally if active metrics present + stats_a.merge_and_save(stats_b) + + # Only restored metrics are merged; duplicates are allowed with incoming overwriting + if other_side == 'restored': + assert dupe_id in stats_a.restored_metrics + assert stats_a.restored_metrics[dupe_id] is m2 # incoming overwrites existing + else: + # No restored metric came from incoming; existing restored stays as-is + if self_side == 'restored': + assert dupe_id in stats_a.restored_metrics + assert stats_a.restored_metrics[dupe_id] is m1 + else: + assert dupe_id not in stats_a.restored_metrics + + def test_save_metrics_preserves_restored_metrics_fix(mock_file_store): """Test that save_metrics correctly preserves restored metrics for unregistered services.""" conversation_id = 'test-conversation-id'