Add lookup attribution logs for V0 runtime spikes

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
openhands
2026-03-15 18:09:06 +00:00
parent f7ca32126f
commit 4ce07632ae
7 changed files with 179 additions and 5 deletions

View File

@@ -180,6 +180,16 @@ Each integration follows a consistent pattern with service classes, storage mode
- Use `patch` with correct import paths (e.g., `telemetry.registry.logger` not `enterprise.telemetry.registry.logger`)
- Test both success and failure scenarios with proper error handling
### Legacy V0 runtime lookup spikes
- Production currently points `CONVERSATION_MANAGER_CLASS` at `server.saas_nested_conversation_manager.SaasNestedConversationManager`, so unexpected runtime-api `GET /sessions/{sid}` bursts are most likely coming from legacy V0 conversation-manager lookups rather than V1 app-server paths.
- For investigation of legacy lookup spikes, the most useful attribution points are:
- `enterprise/server/saas_nested_conversation_manager.py` singleton `get_agent_loop_info(filter_to_sids={sid})` and direct `_get_runtime(sid)` callers
- `enterprise/server/routes/event_webhook.py` session API key checks
- `enterprise/integrations/utils.py` event-store lookups
- `openhands/server/routes/manage_conversations.py` legacy `GET /api/conversations/{conversation_id}`
- A lightweight shared helper lives at `openhands/utils/log_deduper.py` for periodic structured summary logs, which is safer than logging every lookup during an incident.
**Coverage Goals:**
- Aim for 90%+ test coverage on new enterprise modules
- Focus on critical business logic and error handling paths

View File

@@ -21,6 +21,7 @@ from openhands.events.event_store_abc import EventStoreABC
from openhands.events.observation.agent import AgentStateChangedObservation
from openhands.integrations.service_types import Repository
from openhands.storage.data_models.conversation_status import ConversationStatus
from openhands.utils.log_deduper import record_periodic_lookup_summary
if TYPE_CHECKING:
from openhands.server.conversation_manager.conversation_manager import (
@@ -366,6 +367,15 @@ def extract_summary_from_event_store(
async def get_event_store_from_conversation_manager(
conversation_manager: ConversationManager, conversation_id: str
) -> EventStoreABC:
summary = record_periodic_lookup_summary(
('legacy_v0_lookup_entrypoint', 'integration_get_event_store'),
conversation_id,
)
if summary:
logger.info(
'legacy_v0_lookup_entrypoint_summary',
extra={'lookup_source': 'integration_get_event_store', **summary},
)
agent_loop_infos = await conversation_manager.get_agent_loop_info(
filter_to_sids={conversation_id}
)

View File

@@ -625,7 +625,9 @@ async def refresh_tokens(
) -> TokenResponse:
"""Return the latest token for a given provider."""
user_id = _get_user_id(sid)
session_api_key = await _get_session_api_key(user_id, sid)
session_api_key = await _get_session_api_key(
user_id, sid, lookup_source='auth_refresh_tokens'
)
if session_api_key != x_session_api_key:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='Forbidden')

View File

@@ -24,6 +24,7 @@ from storage.database import session_maker
from storage.stored_conversation_metadata_saas import StoredConversationMetadataSaas
from openhands.server.shared import conversation_manager
from openhands.utils.log_deduper import record_periodic_lookup_summary
event_webhook_router = APIRouter(prefix='/event-webhook')
@@ -80,7 +81,11 @@ async def _process_batch_operations_background(
# If the conversation id changes, then we must recheck the session_api_key
if conversation_id != prev_conversation_id:
user_id = _get_user_id(conversation_id)
session_api_key = await _get_session_api_key(user_id, conversation_id)
session_api_key = await _get_session_api_key(
user_id,
conversation_id,
lookup_source='event_webhook_batch_session_api_key',
)
prev_conversation_id = conversation_id
if session_api_key != x_session_api_key:
logger.error(
@@ -178,7 +183,11 @@ async def on_write(
user_id = _get_user_id(conversation_id)
# Check the session API key to make sure this is from the correct conversation
session_api_key = await _get_session_api_key(user_id, conversation_id)
session_api_key = await _get_session_api_key(
user_id,
conversation_id,
lookup_source='event_webhook_write_session_api_key',
)
if session_api_key != x_session_api_key:
return Response(status_code=status.HTTP_403_FORBIDDEN)
@@ -240,7 +249,21 @@ def _get_user_id(conversation_id: str) -> str:
return str(conversation_metadata_saas.user_id)
async def _get_session_api_key(user_id: str, conversation_id: str) -> str | None:
async def _get_session_api_key(
user_id: str,
conversation_id: str,
lookup_source: str = 'event_webhook_session_api_key',
) -> str | None:
summary = record_periodic_lookup_summary(
('legacy_v0_lookup_entrypoint', lookup_source),
conversation_id,
user_id,
)
if summary:
logger.info(
'legacy_v0_lookup_entrypoint_summary',
extra={'lookup_source': lookup_source, **summary},
)
agent_loop_info = await conversation_manager.get_agent_loop_info(
user_id, filter_to_sids={conversation_id}
)

View File

@@ -61,6 +61,7 @@ from openhands.storage.locations import (
)
from openhands.utils.http_session import httpx_verify_option
from openhands.utils.import_utils import get_impl
from openhands.utils.log_deduper import record_periodic_lookup_summary
from openhands.utils.shutdown_listener import should_continue
from openhands.utils.utils import create_registry_and_conversation_stats
@@ -172,8 +173,23 @@ class SaasNestedConversationManager(ConversationManager):
return conversation_ids
def _maybe_log_runtime_lookup_summary(
self, lookup_source: str, sid: str, user_id: str | None = None
) -> None:
summary = record_periodic_lookup_summary(
('saas_nested_runtime_lookup', lookup_source),
sid,
user_id,
)
if summary:
logger.info(
'saas_nested_runtime_lookup_summary',
extra={'lookup_source': lookup_source, **summary},
)
async def is_agent_loop_running(self, sid: str) -> bool:
"""Check if an agent loop is running for the given session ID."""
self._maybe_log_runtime_lookup_summary('is_agent_loop_running', sid)
runtime = await self._get_runtime(sid)
if runtime is None:
return False
@@ -200,6 +216,7 @@ class SaasNestedConversationManager(ConversationManager):
key = self._get_redis_conversation_key(user_id, sid)
starting = await redis.get(key)
self._maybe_log_runtime_lookup_summary('maybe_start_agent_loop', sid, user_id)
runtime = await self._get_runtime(sid)
nested_url = None
@@ -576,6 +593,7 @@ class SaasNestedConversationManager(ConversationManager):
raise ValueError('unsupported_operation')
async def send_event_to_conversation(self, sid: str, data: dict):
self._maybe_log_runtime_lookup_summary('send_event_to_conversation', sid)
runtime = await self._get_runtime(sid)
if runtime is None:
raise ValueError(f'no_such_conversation:{sid}')
@@ -595,6 +613,7 @@ class SaasNestedConversationManager(ConversationManager):
async def close_session(self, sid: str):
logger.info('close_session', extra={'sid': sid})
self._maybe_log_runtime_lookup_summary('close_session', sid)
runtime = await self._get_runtime(sid)
if runtime is None:
logger.info('no_session_to_close', extra={'sid': sid})
@@ -708,7 +727,11 @@ class SaasNestedConversationManager(ConversationManager):
# Get running agent loops from runtime api
if filter_to_sids and len(filter_to_sids) == 1:
runtimes = []
runtime = await self._get_runtime(next(iter(filter_to_sids)))
sid = next(iter(filter_to_sids))
self._maybe_log_runtime_lookup_summary(
'get_agent_loop_info_singleton', sid, user_id
)
runtime = await self._get_runtime(sid)
if runtime:
runtimes.append(runtime)
else:
@@ -1128,6 +1151,8 @@ class SaasNestedConversationManager(ConversationManager):
ValueError: If the conversation is not running.
httpx.HTTPError: If there's an error communicating with the nested runtime.
"""
self._maybe_log_runtime_lookup_summary('list_files', sid)
runtime = await self._get_runtime(sid)
if runtime is None or runtime.get('status') != 'running':
raise ValueError(f'Conversation {sid} is not running')
@@ -1146,7 +1171,9 @@ class SaasNestedConversationManager(ConversationManager):
ValueError: If the conversation is not running.
httpx.HTTPError: If there's an error communicating with the nested runtime.
"""
self._maybe_log_runtime_lookup_summary('select_file', sid)
runtime = await self._get_runtime(sid)
if runtime is None or runtime.get('status') != 'running':
raise ValueError(f'Conversation {sid} is not running')
@@ -1166,6 +1193,7 @@ class SaasNestedConversationManager(ConversationManager):
ValueError: If the conversation is not running.
httpx.HTTPError: If there's an error communicating with the nested runtime.
"""
self._maybe_log_runtime_lookup_summary('upload_files', sid)
runtime = await self._get_runtime(sid)
if runtime is None or runtime.get('status') != 'running':
raise ValueError(f'Conversation {sid} is not running')

View File

@@ -112,6 +112,7 @@ from openhands.storage.settings.settings_store import SettingsStore
from openhands.utils.async_utils import wait_all
from openhands.utils.conversation_summary import get_default_conversation_title
from openhands.utils.environment import get_effective_llm_base_url
from openhands.utils.log_deduper import record_periodic_lookup_summary
app = APIRouter(prefix='/api', dependencies=get_dependencies())
app_conversation_service_dependency = depends_app_conversation_service()
@@ -533,6 +534,18 @@ async def get_conversation(
num_connections = len(
await conversation_manager.get_connections(filter_to_sids={conversation_id})
)
summary = record_periodic_lookup_summary(
('legacy_v0_lookup_entrypoint', 'manage_conversations_get_conversation'),
conversation_id,
)
if summary:
logger.info(
'legacy_v0_lookup_entrypoint_summary',
extra={
'lookup_source': 'manage_conversations_get_conversation',
**summary,
},
)
agent_loop_infos = await conversation_manager.get_agent_loop_info(
filter_to_sids={conversation_id}
)

View File

@@ -0,0 +1,88 @@
from __future__ import annotations
import threading
import time
from dataclasses import dataclass, field
from typing import Any, Hashable
_RECENT_LOG_KEYS: dict[Hashable, float] = {}
_LOOKUP_SUMMARIES: dict[Hashable, 'LookupSummaryState'] = {}
_LOCK = threading.Lock()
_MAX_KEYS = 10000
_DEFAULT_MAX_SAMPLES = 10
@dataclass
class LookupSummaryState:
count: int = 0
sample_conversation_ids: list[str] = field(default_factory=list)
sample_user_ids: list[str] = field(default_factory=list)
last_emitted_at: float = 0.0
def should_emit_deduped_log(key: Hashable, ttl_seconds: float = 300.0) -> bool:
"""Return True if this log key has not been emitted recently."""
now = time.monotonic()
with _LOCK:
last_seen = _RECENT_LOG_KEYS.get(key)
if last_seen is not None and now - last_seen < ttl_seconds:
return False
_RECENT_LOG_KEYS[key] = now
if len(_RECENT_LOG_KEYS) > _MAX_KEYS:
cutoff = now - ttl_seconds
stale_keys = [
existing_key
for existing_key, existing_last_seen in _RECENT_LOG_KEYS.items()
if existing_last_seen < cutoff
]
for stale_key in stale_keys:
_RECENT_LOG_KEYS.pop(stale_key, None)
return True
def record_periodic_lookup_summary(
key: Hashable,
conversation_id: str,
user_id: str | None = None,
flush_interval_seconds: float = 60.0,
max_samples: int = _DEFAULT_MAX_SAMPLES,
) -> dict[str, Any] | None:
"""Aggregate frequent lookup activity and return a periodic summary payload."""
now = time.monotonic()
with _LOCK:
summary = _LOOKUP_SUMMARIES.get(key)
if summary is None:
summary = LookupSummaryState(last_emitted_at=now)
_LOOKUP_SUMMARIES[key] = summary
summary.count += 1
if (
conversation_id not in summary.sample_conversation_ids
and len(summary.sample_conversation_ids) < max_samples
):
summary.sample_conversation_ids.append(conversation_id)
if (
user_id
and user_id not in summary.sample_user_ids
and len(summary.sample_user_ids) < max_samples
):
summary.sample_user_ids.append(user_id)
if now - summary.last_emitted_at < flush_interval_seconds:
return None
payload: dict[str, Any] = {
'lookup_count': summary.count,
'sample_conversation_ids': summary.sample_conversation_ids.copy(),
}
if summary.sample_user_ids:
payload['sample_user_ids'] = summary.sample_user_ids.copy()
summary.count = 0
summary.sample_conversation_ids.clear()
summary.sample_user_ids.clear()
summary.last_emitted_at = now
return payload