diff --git a/enterprise/services/automation_executor.py b/enterprise/services/automation_executor.py index 81df8e04f2..a928836708 100644 --- a/enterprise/services/automation_executor.py +++ b/enterprise/services/automation_executor.py @@ -75,10 +75,8 @@ async def find_matching_automations( source_type = event.source_type payload = event.payload if payload is None: - logger.warning( - 'Event %s has None payload — defaulting to empty dict', event.id - ) - payload = {} + logger.error('Event %s has None payload — possible data corruption', event.id) + return [] if source_type in ('cron', 'manual'): automation_id = payload.get('automation_id') @@ -137,10 +135,10 @@ async def process_new_events(session: AsyncSession) -> int: event.status = 'PROCESSED' event.processed_at = utc_now() processed += 1 - except Exception: + except Exception as e: logger.exception('Error processing event %s', event.id) event.status = 'ERROR' - event.error_detail = 'Failed during event matching' + event.error_detail = f'Failed during event matching: {type(e).__name__}: {e}' event.processed_at = utc_now() if processed: @@ -215,6 +213,45 @@ async def _prepare_run( return api_key, automation_file +async def _monitor_conversation( + run: AutomationRun, + conversation_id: str, + api_client: OpenHandsAPIClient, + api_key: str, + session_factory: object, +) -> bool: + """Monitor a conversation until completion or timeout. + + Returns True if completed successfully, False if shutdown requested. + + Raises: + TimeoutError: If the run exceeds RUN_TIMEOUT_SECONDS. + """ + start_time = utc_now() + while should_continue(): + elapsed = (utc_now() - start_time).total_seconds() + if elapsed > RUN_TIMEOUT_SECONDS: + raise TimeoutError(f'Run exceeded {RUN_TIMEOUT_SECONDS}s timeout') + + await asyncio.sleep(HEARTBEAT_INTERVAL_SECONDS) + + # Update heartbeat + async with session_factory() as session: + run_obj = await session.get(AutomationRun, run.id) + if run_obj: + run_obj.heartbeat_at = utc_now() + await session.commit() + + # Check conversation status + conversation = ( + await api_client.get_conversation(api_key, conversation_id) or {} + ) + if is_terminal(conversation): + return True + + return False # shutdown requested + + async def _submit_and_monitor( run: AutomationRun, api_key: str, @@ -247,37 +284,15 @@ async def _submit_and_monitor( await update_session.commit() # Monitor with heartbeats - start_time = utc_now() - shutdown_interrupted = False - while not is_terminal(conversation): - if not should_continue(): - logger.info('Shutdown requested, stopping run %s monitoring', run.id) - shutdown_interrupted = True - break - - elapsed = (utc_now() - start_time).total_seconds() - if elapsed > RUN_TIMEOUT_SECONDS: - raise TimeoutError( - f'Run {run.id} exceeded timeout of {RUN_TIMEOUT_SECONDS}s' - ) - - await asyncio.sleep(HEARTBEAT_INTERVAL_SECONDS) - - async with session_factory() as hb_session: - run_obj = await hb_session.get(AutomationRun, run.id) - if run_obj: - run_obj.heartbeat_at = utc_now() - await hb_session.commit() - - conversation = ( - await api_client.get_conversation(api_key, conversation_id) or {} - ) + completed = await _monitor_conversation( + run, conversation_id, api_client, api_key, session_factory + ) # Update final status async with session_factory() as final_session: run_obj = await final_session.get(AutomationRun, run.id) if run_obj: - if shutdown_interrupted: + if not completed: # Leave as RUNNING — stale recovery will handle it if needed. # The conversation may still be running on the API side. logger.info( diff --git a/enterprise/tests/unit/services/test_automation_executor.py b/enterprise/tests/unit/services/test_automation_executor.py index ed24339706..666139ea14 100644 --- a/enterprise/tests/unit/services/test_automation_executor.py +++ b/enterprise/tests/unit/services/test_automation_executor.py @@ -539,3 +539,86 @@ def test_is_terminal_empty(): def test_is_terminal_case_insensitive(): assert is_terminal({'status': 'stopped'}) is True assert is_terminal({'status': 'Completed'}) is True + + +# --------------------------------------------------------------------------- +# find_matching_automations — None payload +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_find_matching_automations_none_payload(async_session): + """Events with None payload return empty list (data corruption guard).""" + event = make_event(source_type='cron') + event.payload = None + async_session.add(event) + await async_session.commit() + + result = await find_matching_automations(async_session, event) + + assert result == [] + + +# --------------------------------------------------------------------------- +# Integration: event → run creation → claim +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_integration_event_to_run_to_claim( + async_session_factory, +): + """Full flow: create event + automation → process_new_events → claim_and_execute_runs. + + Uses a real SQLite database; only the external API client is mocked. + """ + # 1. Seed an automation and a NEW event + async with async_session_factory() as session: + automation = make_automation(automation_id='integ-auto') + event = make_event( + source_type='cron', + payload={'automation_id': 'integ-auto'}, + dedup_key='integ-dedup', + ) + session.add_all([automation, event]) + await session.commit() + event_id = event.id + + # 2. Process inbox — should match and create a PENDING run + async with async_session_factory() as session: + processed = await process_new_events(session) + + assert processed == 1 + + # Verify event is PROCESSED and run was created + async with async_session_factory() as session: + evt = await session.get(AutomationEvent, event_id) + assert evt.status == 'PROCESSED' + + runs = (await session.execute(select(AutomationRun))).scalars().all() + assert len(runs) == 1 + run = runs[0] + assert run.automation_id == 'integ-auto' + assert run.status == 'PENDING' + assert run.event_payload == {'automation_id': 'integ-auto'} + + # 3. Claim the run — mock execute_run to avoid real API calls + api_client = AsyncMock() + + with patch('services.automation_executor.execute_run', new_callable=AsyncMock): + async with async_session_factory() as session: + claimed = await claim_and_execute_runs( + session, 'executor-integ', api_client, async_session_factory + ) + + assert claimed is True + + # 4. Verify the run moved to RUNNING with correct executor + async with async_session_factory() as session: + runs = (await session.execute(select(AutomationRun))).scalars().all() + assert len(runs) == 1 + run = runs[0] + assert run.status == 'RUNNING' + assert run.claimed_by == 'executor-integ' + assert run.started_at is not None + assert run.heartbeat_at is not None