diff --git a/evaluation/swe_bench/run_infer.py b/evaluation/swe_bench/run_infer.py index 7f228a5d1f..ff846ed1a8 100644 --- a/evaluation/swe_bench/run_infer.py +++ b/evaluation/swe_bench/run_infer.py @@ -218,7 +218,7 @@ def initialize_runtime( assert obs.exit_code == 0 action = CmdRunAction(command='source /swe_util/instance_swe_entry.sh') - action.timeout = 600 + action.timeout = 1800 logger.info(action, extra={'msg_type': 'ACTION'}) obs = runtime.run_action(action) logger.info(obs, extra={'msg_type': 'OBSERVATION'}) diff --git a/evaluation/utils/shared.py b/evaluation/utils/shared.py index 5306881d26..972698659b 100644 --- a/evaluation/utils/shared.py +++ b/evaluation/utils/shared.py @@ -297,23 +297,47 @@ def run_evaluation( try: if use_multiprocessing: with ProcessPoolExecutor(num_workers) as executor: - while not instance_queue.empty(): - futures = [] - for _ in range(min(num_workers, instance_queue.qsize())): - instance = instance_queue.get() - future = executor.submit( - process_instance, - instance, - metadata, - True, - process_instance_func, - ) - future.add_done_callback( - lambda f, inst=instance: update_progress(f.result(), inst) - ) - futures.append(future) - for future in futures: - future.result() + batch_futures = [] + + # Loop until there are *no more instances to be processed* and *all (in-progress) futures are done* + # since a running future may add new instances to the queue when error occurs + while not instance_queue.empty() or batch_futures: + # Submit new tasks if **there are instances to be processed** and **available workers** + while ( + not instance_queue.empty() and len(batch_futures) < num_workers + ): + try: + instance = instance_queue.get(block=False) + future = executor.submit( + process_instance, + instance, + metadata, + True, + process_instance_func, + ) + future.add_done_callback( + lambda f, inst=instance: update_progress( + f.result(), inst + ) + ) + batch_futures.append(future) + except mp.queues.Empty: + logger.warning( + 'Queue is empty - This should not happen. This is a bug.' + ) + break # Queue is empty, stop submitting new tasks + + # Continue to wait for the futures to be done & remove completed futures + batch_futures = [f for f in batch_futures if not f.done()] + + # Short sleep to prevent busy-waiting + time.sleep(1) + + # Ensure all futures are done + assert instance_queue.empty(), 'instance_queue should be empty after all futures are done. This is a bug.' + assert ( + len(batch_futures) == 0 + ), 'batch_futures should be empty after all futures are done. This is a bug.' else: while not instance_queue.empty(): instance = instance_queue.get()