fix: include real errors in event detail, handle None payload explicitly, extract monitoring helper, add integration test

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
openhands
2026-03-11 17:54:51 +00:00
parent d57e6c134c
commit ff5274f60b
2 changed files with 130 additions and 32 deletions

View File

@@ -75,10 +75,8 @@ async def find_matching_automations(
source_type = event.source_type
payload = event.payload
if payload is None:
logger.warning(
'Event %s has None payload — defaulting to empty dict', event.id
)
payload = {}
logger.error('Event %s has None payload — possible data corruption', event.id)
return []
if source_type in ('cron', 'manual'):
automation_id = payload.get('automation_id')
@@ -137,10 +135,10 @@ async def process_new_events(session: AsyncSession) -> int:
event.status = 'PROCESSED'
event.processed_at = utc_now()
processed += 1
except Exception:
except Exception as e:
logger.exception('Error processing event %s', event.id)
event.status = 'ERROR'
event.error_detail = 'Failed during event matching'
event.error_detail = f'Failed during event matching: {type(e).__name__}: {e}'
event.processed_at = utc_now()
if processed:
@@ -215,6 +213,45 @@ async def _prepare_run(
return api_key, automation_file
async def _monitor_conversation(
run: AutomationRun,
conversation_id: str,
api_client: OpenHandsAPIClient,
api_key: str,
session_factory: object,
) -> bool:
"""Monitor a conversation until completion or timeout.
Returns True if completed successfully, False if shutdown requested.
Raises:
TimeoutError: If the run exceeds RUN_TIMEOUT_SECONDS.
"""
start_time = utc_now()
while should_continue():
elapsed = (utc_now() - start_time).total_seconds()
if elapsed > RUN_TIMEOUT_SECONDS:
raise TimeoutError(f'Run exceeded {RUN_TIMEOUT_SECONDS}s timeout')
await asyncio.sleep(HEARTBEAT_INTERVAL_SECONDS)
# Update heartbeat
async with session_factory() as session:
run_obj = await session.get(AutomationRun, run.id)
if run_obj:
run_obj.heartbeat_at = utc_now()
await session.commit()
# Check conversation status
conversation = (
await api_client.get_conversation(api_key, conversation_id) or {}
)
if is_terminal(conversation):
return True
return False # shutdown requested
async def _submit_and_monitor(
run: AutomationRun,
api_key: str,
@@ -247,37 +284,15 @@ async def _submit_and_monitor(
await update_session.commit()
# Monitor with heartbeats
start_time = utc_now()
shutdown_interrupted = False
while not is_terminal(conversation):
if not should_continue():
logger.info('Shutdown requested, stopping run %s monitoring', run.id)
shutdown_interrupted = True
break
elapsed = (utc_now() - start_time).total_seconds()
if elapsed > RUN_TIMEOUT_SECONDS:
raise TimeoutError(
f'Run {run.id} exceeded timeout of {RUN_TIMEOUT_SECONDS}s'
)
await asyncio.sleep(HEARTBEAT_INTERVAL_SECONDS)
async with session_factory() as hb_session:
run_obj = await hb_session.get(AutomationRun, run.id)
if run_obj:
run_obj.heartbeat_at = utc_now()
await hb_session.commit()
conversation = (
await api_client.get_conversation(api_key, conversation_id) or {}
)
completed = await _monitor_conversation(
run, conversation_id, api_client, api_key, session_factory
)
# Update final status
async with session_factory() as final_session:
run_obj = await final_session.get(AutomationRun, run.id)
if run_obj:
if shutdown_interrupted:
if not completed:
# Leave as RUNNING — stale recovery will handle it if needed.
# The conversation may still be running on the API side.
logger.info(

View File

@@ -539,3 +539,86 @@ def test_is_terminal_empty():
def test_is_terminal_case_insensitive():
assert is_terminal({'status': 'stopped'}) is True
assert is_terminal({'status': 'Completed'}) is True
# ---------------------------------------------------------------------------
# find_matching_automations — None payload
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_find_matching_automations_none_payload(async_session):
"""Events with None payload return empty list (data corruption guard)."""
event = make_event(source_type='cron')
event.payload = None
async_session.add(event)
await async_session.commit()
result = await find_matching_automations(async_session, event)
assert result == []
# ---------------------------------------------------------------------------
# Integration: event → run creation → claim
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_integration_event_to_run_to_claim(
async_session_factory,
):
"""Full flow: create event + automation → process_new_events → claim_and_execute_runs.
Uses a real SQLite database; only the external API client is mocked.
"""
# 1. Seed an automation and a NEW event
async with async_session_factory() as session:
automation = make_automation(automation_id='integ-auto')
event = make_event(
source_type='cron',
payload={'automation_id': 'integ-auto'},
dedup_key='integ-dedup',
)
session.add_all([automation, event])
await session.commit()
event_id = event.id
# 2. Process inbox — should match and create a PENDING run
async with async_session_factory() as session:
processed = await process_new_events(session)
assert processed == 1
# Verify event is PROCESSED and run was created
async with async_session_factory() as session:
evt = await session.get(AutomationEvent, event_id)
assert evt.status == 'PROCESSED'
runs = (await session.execute(select(AutomationRun))).scalars().all()
assert len(runs) == 1
run = runs[0]
assert run.automation_id == 'integ-auto'
assert run.status == 'PENDING'
assert run.event_payload == {'automation_id': 'integ-auto'}
# 3. Claim the run — mock execute_run to avoid real API calls
api_client = AsyncMock()
with patch('services.automation_executor.execute_run', new_callable=AsyncMock):
async with async_session_factory() as session:
claimed = await claim_and_execute_runs(
session, 'executor-integ', api_client, async_session_factory
)
assert claimed is True
# 4. Verify the run moved to RUNNING with correct executor
async with async_session_factory() as session:
runs = (await session.execute(select(AutomationRun))).scalars().all()
assert len(runs) == 1
run = runs[0]
assert run.status == 'RUNNING'
assert run.claimed_by == 'executor-integ'
assert run.started_at is not None
assert run.heartbeat_at is not None