[eval] yet another eval fixes on multi-processing (#3854)

Co-authored-by: Graham Neubig <neubig@gmail.com>
This commit is contained in:
Xingyao Wang 2024-09-13 10:51:22 -05:00 committed by GitHub
parent cea6b6e30e
commit 3a1b8c093b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 42 additions and 18 deletions

View File

@ -218,7 +218,7 @@ def initialize_runtime(
assert obs.exit_code == 0
action = CmdRunAction(command='source /swe_util/instance_swe_entry.sh')
action.timeout = 600
action.timeout = 1800
logger.info(action, extra={'msg_type': 'ACTION'})
obs = runtime.run_action(action)
logger.info(obs, extra={'msg_type': 'OBSERVATION'})

View File

@ -297,23 +297,47 @@ def run_evaluation(
try:
if use_multiprocessing:
with ProcessPoolExecutor(num_workers) as executor:
while not instance_queue.empty():
futures = []
for _ in range(min(num_workers, instance_queue.qsize())):
instance = instance_queue.get()
future = executor.submit(
process_instance,
instance,
metadata,
True,
process_instance_func,
)
future.add_done_callback(
lambda f, inst=instance: update_progress(f.result(), inst)
)
futures.append(future)
for future in futures:
future.result()
batch_futures = []
# Loop until there are *no more instances to be processed* and *all (in-progress) futures are done*
# since a running future may add new instances to the queue when error occurs
while not instance_queue.empty() or batch_futures:
# Submit new tasks if **there are instances to be processed** and **available workers**
while (
not instance_queue.empty() and len(batch_futures) < num_workers
):
try:
instance = instance_queue.get(block=False)
future = executor.submit(
process_instance,
instance,
metadata,
True,
process_instance_func,
)
future.add_done_callback(
lambda f, inst=instance: update_progress(
f.result(), inst
)
)
batch_futures.append(future)
except mp.queues.Empty:
logger.warning(
'Queue is empty - This should not happen. This is a bug.'
)
break # Queue is empty, stop submitting new tasks
# Continue to wait for the futures to be done & remove completed futures
batch_futures = [f for f in batch_futures if not f.done()]
# Short sleep to prevent busy-waiting
time.sleep(1)
# Ensure all futures are done
assert instance_queue.empty(), 'instance_queue should be empty after all futures are done. This is a bug.'
assert (
len(batch_futures) == 0
), 'batch_futures should be empty after all futures are done. This is a bug.'
else:
while not instance_queue.empty():
instance = instance_queue.get()