mirror of
https://github.com/OpenHands/OpenHands.git
synced 2025-12-26 05:48:36 +08:00
Fix: Add method to merge conversation stats (#10667)
Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
parent
6a544d4274
commit
6eb32e9ae4
@ -101,3 +101,60 @@ class ConversationStats:
|
||||
del self.restored_metrics[service_id]
|
||||
|
||||
self.service_to_metrics[service_id] = llm.metrics
|
||||
|
||||
def merge_and_save(self, conversation_stats: 'ConversationStats'):
|
||||
"""
|
||||
Merge restored metrics from another ConversationStats into this one.
|
||||
|
||||
Important:
|
||||
- This method is intended to be used immediately after restoring metrics from
|
||||
storage, before any LLM services are registered. In that state, only
|
||||
`restored_metrics` should contain entries and `service_to_metrics` should
|
||||
be empty. If either side has entries in `service_to_metrics`, we log an
|
||||
error but continue execution.
|
||||
|
||||
Behavior:
|
||||
- Drop entries with zero accumulated_cost from both `restored_metrics` dicts
|
||||
(self and incoming) before merging.
|
||||
- Merge only `restored_metrics`. For duplicate keys, the incoming
|
||||
`conversation_stats.restored_metrics` overwrites existing entries.
|
||||
- Do NOT merge `service_to_metrics` here.
|
||||
- Persist results by calling save_metrics().
|
||||
"""
|
||||
|
||||
# If either side has active service metrics, log an error but proceed
|
||||
if self.service_to_metrics or conversation_stats.service_to_metrics:
|
||||
logger.error(
|
||||
'merge_and_save should be used only when service_to_metrics are empty; '
|
||||
'found active service metrics during merge. Proceeding anyway.',
|
||||
extra={
|
||||
'conversation_id': self.conversation_id,
|
||||
'self_service_to_metrics_keys': list(
|
||||
self.service_to_metrics.keys()
|
||||
),
|
||||
'incoming_service_to_metrics_keys': list(
|
||||
conversation_stats.service_to_metrics.keys()
|
||||
),
|
||||
},
|
||||
)
|
||||
|
||||
# Drop zero-cost entries from restored metrics only
|
||||
def _drop_zero_cost(d: dict[str, Metrics]) -> None:
|
||||
to_delete = [
|
||||
k for k, v in d.items() if getattr(v, 'accumulated_cost', 0) == 0
|
||||
]
|
||||
for k in to_delete:
|
||||
del d[k]
|
||||
|
||||
_drop_zero_cost(self.restored_metrics)
|
||||
_drop_zero_cost(conversation_stats.restored_metrics)
|
||||
|
||||
# Merge restored metrics, allowing incoming to overwrite
|
||||
self.restored_metrics.update(conversation_stats.restored_metrics)
|
||||
|
||||
# Save merged state
|
||||
self.save_metrics()
|
||||
logger.info(
|
||||
'Merged conversation stats',
|
||||
extra={'conversation_id': self.conversation_id},
|
||||
)
|
||||
|
||||
@ -490,6 +490,117 @@ def test_save_and_restore_workflow(mock_file_store):
|
||||
assert llm.metrics.accumulated_token_usage.completion_tokens == 50
|
||||
|
||||
|
||||
def test_merge_conversation_stats_success_non_overlapping(mock_file_store):
|
||||
"""Merging two ConversationStats combines only restored metrics. Active metrics
|
||||
(service_to_metrics) are not merged; if present, an error is logged but
|
||||
execution continues. Incoming restored metrics overwrite duplicates.
|
||||
"""
|
||||
stats_a = ConversationStats(
|
||||
file_store=mock_file_store, conversation_id='conv-merge-a', user_id='user-x'
|
||||
)
|
||||
stats_b = ConversationStats(
|
||||
file_store=mock_file_store, conversation_id='conv-merge-b', user_id='user-x'
|
||||
)
|
||||
|
||||
# Self active + restored
|
||||
m_a_active = Metrics(model_name='model-a')
|
||||
m_a_active.add_cost(0.1)
|
||||
m_a_restored = Metrics(model_name='model-a')
|
||||
m_a_restored.add_cost(0.2)
|
||||
stats_a.service_to_metrics['a-active'] = m_a_active
|
||||
stats_a.restored_metrics = {'a-restored': m_a_restored}
|
||||
|
||||
# Other active + restored
|
||||
m_b_active = Metrics(model_name='model-b')
|
||||
m_b_active.add_cost(0.3)
|
||||
m_b_restored = Metrics(model_name='model-b')
|
||||
m_b_restored.add_cost(0.4)
|
||||
stats_b.service_to_metrics['b-active'] = m_b_active
|
||||
stats_b.restored_metrics = {'b-restored': m_b_restored}
|
||||
|
||||
# Merge B into A
|
||||
stats_a.merge_and_save(stats_b)
|
||||
|
||||
# Active metrics should not be merged; only self's active metrics remain
|
||||
assert set(stats_a.service_to_metrics.keys()) == {
|
||||
'a-active',
|
||||
}
|
||||
|
||||
# Restored metrics from both sides should be in A's restored_metrics
|
||||
assert set(stats_a.restored_metrics.keys()) == {
|
||||
'a-restored',
|
||||
'b-restored',
|
||||
}
|
||||
|
||||
# The exact Metrics objects should be present (no copies)
|
||||
assert stats_a.service_to_metrics['a-active'] is m_a_active
|
||||
assert stats_a.restored_metrics['a-restored'] is m_a_restored
|
||||
assert stats_a.restored_metrics['b-restored'] is m_b_restored
|
||||
|
||||
# Merge triggers a save; confirm the saved blob decodes to expected keys
|
||||
# The save_metrics method combines both service_to_metrics and restored_metrics
|
||||
encoded = mock_file_store.read(stats_a.metrics_path)
|
||||
pickled = base64.b64decode(encoded)
|
||||
restored_dict = pickle.loads(pickled)
|
||||
assert set(restored_dict.keys()) == {
|
||||
'a-active',
|
||||
'a-restored',
|
||||
'b-restored',
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'self_side,other_side',
|
||||
[
|
||||
('active', 'active'),
|
||||
('restored', 'active'),
|
||||
('active', 'restored'),
|
||||
('restored', 'restored'),
|
||||
],
|
||||
)
|
||||
def test_merge_conversation_stats_duplicates_overwrite_and_log_errors(
|
||||
mock_file_store, self_side, other_side
|
||||
):
|
||||
stats_a = ConversationStats(
|
||||
file_store=mock_file_store, conversation_id='conv-merge-a', user_id='user-x'
|
||||
)
|
||||
stats_b = ConversationStats(
|
||||
file_store=mock_file_store, conversation_id='conv-merge-b', user_id='user-x'
|
||||
)
|
||||
|
||||
# Place the same service id on the specified sides
|
||||
dupe_id = 'dupe-service'
|
||||
m1 = Metrics(model_name='m')
|
||||
m1.add_cost(0.1) # ensure not dropped
|
||||
m2 = Metrics(model_name='m')
|
||||
m2.add_cost(0.2) # ensure not dropped
|
||||
|
||||
if self_side == 'active':
|
||||
stats_a.service_to_metrics[dupe_id] = m1
|
||||
else:
|
||||
stats_a.restored_metrics[dupe_id] = m1
|
||||
|
||||
if other_side == 'active':
|
||||
stats_b.service_to_metrics[dupe_id] = m2
|
||||
else:
|
||||
stats_b.restored_metrics[dupe_id] = m2
|
||||
|
||||
# Perform merge; should not raise and should log error internally if active metrics present
|
||||
stats_a.merge_and_save(stats_b)
|
||||
|
||||
# Only restored metrics are merged; duplicates are allowed with incoming overwriting
|
||||
if other_side == 'restored':
|
||||
assert dupe_id in stats_a.restored_metrics
|
||||
assert stats_a.restored_metrics[dupe_id] is m2 # incoming overwrites existing
|
||||
else:
|
||||
# No restored metric came from incoming; existing restored stays as-is
|
||||
if self_side == 'restored':
|
||||
assert dupe_id in stats_a.restored_metrics
|
||||
assert stats_a.restored_metrics[dupe_id] is m1
|
||||
else:
|
||||
assert dupe_id not in stats_a.restored_metrics
|
||||
|
||||
|
||||
def test_save_metrics_preserves_restored_metrics_fix(mock_file_store):
|
||||
"""Test that save_metrics correctly preserves restored metrics for unregistered services."""
|
||||
conversation_id = 'test-conversation-id'
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user