OpenHands/enterprise/run_maintenance_tasks.py
2025-09-04 15:44:54 -04:00

79 lines
2.2 KiB
Python

import asyncio
from datetime import datetime, timedelta, timezone
from server.logger import logger
from storage.database import session_maker
from storage.maintenance_task import (
MaintenanceTask,
MaintenanceTaskStatus,
)
NUM_RETRIES = 3
RETRY_DELAY = 60
async def main():
try:
set_stale_task_error()
await run_tasks()
except Exception as e:
logger.info(f'Error running maintenance tasks: {e}')
def set_stale_task_error():
with session_maker() as session:
session.query(MaintenanceTask).filter(
MaintenanceTask.status == MaintenanceTaskStatus.WORKING,
MaintenanceTask.started_at
< datetime.now(timezone.utc) - timedelta(hours=1),
).update({MaintenanceTask.status: MaintenanceTaskStatus.ERROR})
session.commit()
async def run_tasks():
while True:
with session_maker() as session:
task = await next_task(session)
if not task:
return
# Update the status
task.status = MaintenanceTaskStatus.WORKING
task.updated_at = task.started_at = datetime.now(timezone.utc)
session.commit()
try:
processor = task.get_processor()
task.info = await processor(task)
task.status = MaintenanceTaskStatus.COMPLETED
session.commit()
except Exception as e:
task.info = {'error': str(e)}
task.status = MaintenanceTaskStatus.ERROR
session.commit()
# wait if there is a delay (this allows us to bypass throttling constraints)
if task.delay:
await asyncio.sleep(task.delay)
async def next_task(session) -> MaintenanceTask | None:
num_retries = NUM_RETRIES
while True:
task = (
session.query(MaintenanceTask)
.filter(MaintenanceTask.status == MaintenanceTaskStatus.PENDING)
.order_by(MaintenanceTask.created_at)
.first()
)
if task:
return task
task = next_task
num_retries -= 1
if num_retries < 0:
return None
if __name__ == '__main__':
asyncio.run(main())