diff --git a/evaluation/EDA/run_infer.py b/evaluation/EDA/run_infer.py index a7cd1f10ca..bf106f30ce 100644 --- a/evaluation/EDA/run_infer.py +++ b/evaluation/EDA/run_infer.py @@ -1,18 +1,21 @@ import asyncio -import json import logging import multiprocessing as mp import os -import pathlib -import subprocess -import time -from concurrent.futures import ProcessPoolExecutor + +import pandas as pd # import huggingface_hub from datasets import load_dataset -from tqdm import tqdm from evaluation.EDA.game import Q20Game, Q20GameCelebrity +from evaluation.utils.shared import ( + EvalMetadata, + make_metadata, + monologue_user_response, + prepare_dataset, + run_evaluation, +) from opendevin.controller.agent import Agent # from evaluation.EDA.scorer import question_scorer @@ -36,7 +39,7 @@ def cleanup(): process.join() -def codeact_user_response(state: State) -> str: +def codeact_user_response_eda(state: State) -> str: global game model_guess = '' if state.history: @@ -54,12 +57,8 @@ def codeact_user_response(state: State) -> str: return msg -def monologue_user_response(state: State) -> str: - raise NotImplementedError('MonologueAgent should never ask for user responses.') - - AGENT_CLS_TO_FAKE_USER_RESPONSE_FN = { - 'CodeActAgent': codeact_user_response, + 'CodeActAgent': codeact_user_response_eda, 'MonologueAgent': monologue_user_response, } @@ -69,10 +68,14 @@ AGENT_CLS_TO_INST_SUFFIX = { def process_instance( - agent: Agent, instance, metadata, openai_api_key, reset_logger: bool = True + instance: pd.Series, + metadata: EvalMetadata, + reset_logger: bool = True, ): + # Create the agent + agent = Agent.get_cls(metadata.agent_class)(llm=LLM(llm_config=metadata.llm_config)) # Setup the logger properly, so you can run multi-processing to parallelize the evaluation - eval_output_dir = metadata['eval_output_dir'] + eval_output_dir = metadata.eval_output_dir if reset_logger: # Set up logger log_file = os.path.join( @@ -107,12 +110,14 @@ def process_instance( # Use codeactagent as guesser_model global game - game = _game_class[metadata['dataset']]( + assert metadata.dataset is not None + assert metadata.details is not None + game = _game_class[metadata.dataset]( item=instance['text'].strip(), - answerer_model=metadata['answerer_model'], + answerer_model=metadata.details['answerer_model'], guesser_model=None, - num_turns=metadata['max_iterations'], - openai_api_key=openai_api_key, + num_turns=metadata.max_iterations, + openai_api_key=metadata.details['openai_api_key'], guesser_kargs=guesser_kargs, ) @@ -195,151 +200,42 @@ if __name__ == '__main__': help='data split, eg, test', ) args, _ = parser.parse_known_args() - if args.directory: - config.workspace_base = os.path.abspath(args.directory) - print(f'Setting workspace base to {config.workspace_base}') - # NOTE: It is preferable to load datasets from huggingface datasets and perform post-processing - # so we don't need to manage file uploading to OpenDevin's repo - eda_dataset = load_dataset( - 'yizheapple/entity-deduction-arena', name=args.dataset, split=args.data_split - ) - logger.info( - f'Evaluating Entity Deduction Arena {args.dataset} {args.data_split} split' - ) - # Check https://github.com/OpenDevin/OpenDevin/blob/main/evaluation/swe_bench/README.md#configure-opendevin-and-your-llm - # for details of how to set `llm_config` if args.llm_config: specified_llm_config = get_llm_config_arg(args.llm_config) if specified_llm_config: config.llm = specified_llm_config - logger.info(f'Config for evaluation: {config}') - # TEST METADATA - agent_class = args.agent_cls - assert ( - agent_class in AGENT_CLS_TO_FAKE_USER_RESPONSE_FN - ), f'Unsupported agent class: {agent_class}' - model_name = config.llm.model.split('/')[-1] - max_iterations = args.max_iterations - eval_note = '' - if args.eval_note is not None: - eval_note += '_N_' + args.eval_note - eval_output_dir = os.path.join( + eda_dataset = load_dataset( + 'yizheapple/entity-deduction-arena', name=args.dataset, split=args.data_split + ) + + metadata = make_metadata( + config.llm, + f'eda-{args.dataset}', + args.agent_cls, + args.max_iterations, + args.eval_note, args.eval_output_dir, - 'eda', - agent_class, - model_name + '_maxiter_' + str(max_iterations) + eval_note, + data_split=args.data_split, + details={ + 'answerer_model': str(args.answerer_model), + 'openai_api_key': str(args.OPENAI_API_KEY), + }, ) - pathlib.Path(eval_output_dir).mkdir(parents=True, exist_ok=True) - pathlib.Path(os.path.join(eval_output_dir, 'logs')).mkdir( - parents=True, exist_ok=True - ) - logger.info(f'Using evaluation output directory: {eval_output_dir}') - - metadata = { - 'dataset': args.dataset, - 'data_split': args.data_split, - 'answerer_model': args.answerer_model, - 'agent_class': agent_class, - 'model_name': model_name, - 'max_iterations': max_iterations, - 'eval_output_dir': eval_output_dir, - 'start_time': time.strftime('%Y-%m-%d %H:%M:%S'), - # get the commit id of current repo for reproducibility - 'git_commit': subprocess.check_output(['git', 'rev-parse', 'HEAD']) - .decode('utf-8') - .strip(), - } - logger.info(f'Metadata: {metadata}') - with open(os.path.join(eval_output_dir, 'metadata.json'), 'w') as f: - json.dump(metadata, f) - - # LIMIT EVALUATION - eval_n_limit = args.eval_n_limit - if eval_n_limit: - eda_dataset = eda_dataset.select(list(range(eval_n_limit))) - logger.info(f'Limiting evaluation to first {eval_n_limit} instances.') - - # OUTPUT FILE - output_file = os.path.join(eval_output_dir, 'output.jsonl') - logger.info(f'Writing evaluation output to {output_file}') - finished_items = set() - if os.path.exists(output_file): - with open(output_file, 'r') as f: - for line in f: - data = json.loads(line) - finished_items.add(data['instance_id']) - logger.warning( - f'Output file {output_file} already exists. Loaded {len(finished_items)} finished instances.' - ) - output_fp = open(output_file, 'a') - - logger.info( - f'Evaluation started with Agent {agent_class}, model {model_name}, max iterations {max_iterations}.' + output_file = os.path.join(metadata.eval_output_dir, 'output.jsonl') + prepared_dataset = prepare_dataset( + eda_dataset.to_pandas(), output_file, args.eval_n_limit, 'text' ) - # ============================================= - # filter out finished instances - new_eda_dataset = [] - for instance in eda_dataset: - if instance['text'].strip() in finished_items: - logger.info( - f'Skipping instance {instance["text"].strip()} as it is already finished.' - ) - continue - new_eda_dataset.append(instance) + agent = Agent.get_cls(args.agent_cls)(llm=LLM(config.llm)) - eda_dataset = new_eda_dataset - logger.info( - f'Finished instances: {len(finished_items)}, Remaining instances: {len(eda_dataset)}' + run_evaluation( + prepared_dataset, + metadata, + output_file, + args.eval_num_workers, + process_instance, + 'text', ) - # ============================================= - - pbar = tqdm(total=len(eda_dataset)) - - # This function tracks the progress AND write the output to a JSONL file - def update_progress(future): - pbar.update(1) - output = future.result() - pbar.set_description(f'Instance {output["instance_id"]}') - pbar.set_postfix_str(f'Test Result: {output["test_result"]}') - logger.info( - f'Finished evaluation for instance {output["instance_id"]}: {output["test_result"]}' - ) - output_fp.write(json.dumps(output) + '\n') - output_fp.flush() - - # This sets the multi-processing - num_workers = args.eval_num_workers - logger.info(f'Using {num_workers} workers for evaluation.') - - # Create the agent - agent = Agent.get_cls(agent_class)(llm=LLM(config.llm)) - - try: - with ProcessPoolExecutor(num_workers) as executor: - futures = [] - # This is how we perform multi-processing - for instance in eda_dataset: - future = executor.submit( - process_instance, - agent, - instance, - metadata, - args.OPENAI_API_KEY, - reset_logger=bool(num_workers > 1), - ) - future.add_done_callback(update_progress) - futures.append(future) - - # Wait for all futures to complete - for future in futures: - future.result() - except KeyboardInterrupt: - print('KeyboardInterrupt received. Cleaning up...') - cleanup() - - output_fp.close() - logger.info('Evaluation finished.') diff --git a/evaluation/agent_bench/helper.py b/evaluation/agent_bench/helper.py index bb14ff14dc..3a11b5f0b2 100644 --- a/evaluation/agent_bench/helper.py +++ b/evaluation/agent_bench/helper.py @@ -1,9 +1,39 @@ import os import re +from functools import partial +from evaluation.utils.shared import codeact_user_response from opendevin.events.action import CmdRunAction, MessageAction +def try_parse_answer(act) -> str | None: + raw_ans = '' + if isinstance(act, MessageAction) and act.source == 'agent': + raw_ans = act.content + elif isinstance(act, CmdRunAction) and act.source == 'agent': + raw_ans = act.thought + else: + return None + agent_answer = re.findall(r'(.*?)', raw_ans) + if not agent_answer: + return None + return agent_answer[0].strip() + + +FAKE_RESPONSES = { + 'CodeActAgent': partial( + codeact_user_response, encapsulate_solution=True, try_parse=try_parse_answer + ), +} + +INST_SUFFIXES: dict[str, str] = { + 'CodeActAgent': ( + 'When you think you have solved the question, ' + 'please first send your answer to user through message and then exit.\n' + ) +} + + def analysis_size(size_str): size_str = size_str.strip() avails = { @@ -45,17 +75,3 @@ def create_sh_file(filename: str, cmds: str) -> None: with open(filename, 'w', encoding='utf-8') as file: file.write(cmds.replace('\r\n', '\n')) os.chmod(filename, 0o755) - - -def try_parse_answer(act) -> str | None: - raw_ans = '' - if isinstance(act, MessageAction) and act.source == 'agent': - raw_ans = act.content - elif isinstance(act, CmdRunAction) and act.source == 'agent': - raw_ans = act.thought - else: - return None - agent_answer = re.findall(r'(.*?)', raw_ans) - if not agent_answer: - return None - return agent_answer[0].strip() diff --git a/evaluation/agent_bench/run_infer.py b/evaluation/agent_bench/run_infer.py index d38292ebe3..60d2952df5 100644 --- a/evaluation/agent_bench/run_infer.py +++ b/evaluation/agent_bench/run_infer.py @@ -1,27 +1,28 @@ import asyncio -import json import logging -import multiprocessing as mp import os -import pathlib import re import shutil -import subprocess -import time -from concurrent.futures import ProcessPoolExecutor import docker +import pandas as pd from datasets import load_dataset -from tqdm import tqdm from evaluation.agent_bench.helper import ( + FAKE_RESPONSES, + INST_SUFFIXES, compare_results, create_sh_file, - try_parse_answer, +) +from evaluation.utils.shared import ( + EvalMetadata, + make_metadata, + prepare_dataset, + run_evaluation, ) from opendevin.controller.agent import Agent from opendevin.controller.state.state import State -from opendevin.core.config import config, get_llm_config_arg, parse_arguments +from opendevin.core.config import LLMConfig, config, get_llm_config_arg, parse_arguments from opendevin.core.logger import get_console_handler from opendevin.core.logger import opendevin_logger as logger from opendevin.core.main import run_agent_controller @@ -31,64 +32,13 @@ from opendevin.llm.llm import LLM from opendevin.runtime.docker.ssh_box import DockerSSHBox -def cleanup(): - print('Cleaning up child processes...') - for process in mp.active_children(): - print(f'Terminating child process: {process.name}') - process.terminate() - process.join() - - -def codeact_user_response(state: State) -> str: - msg = ( - 'Please continue working on the task on whatever approach you think is suitable.\n' - 'If you think you have solved the task, please first send your answer to user through ' - 'message and then exit .\n' - 'Please encapsulate your final answer (answer ONLY) within and .\n' - 'For example: The answer to the question is 42 .\n' - 'IMPORTANT: YOU SHOULD NEVER ASK FOR HUMAN HELP.\n' - ) - if state.history: - # check if the last action is an answer, if so, return exit for early exit - last_action, _ = state.history[-1] - ans = try_parse_answer(last_action) - if ans is not None: - return '/exit' - - user_msgs = [ - action - for action, _ in state.history - if isinstance(action, MessageAction) and action.source == 'user' - ] - if len(user_msgs) >= 2: - # let the agent know that it can give up when it has tried 3 times - return ( - msg - + 'If you want to give up, run: exit .\n' - ) - return msg - - -AGENT_CLS_TO_FAKE_USER_RESPONSE_FN = { - 'CodeActAgent': codeact_user_response, -} - -AGENT_CLS_TO_INST_SUFFIX = { - 'CodeActAgent': 'When you think you have solved the question, ' - 'please first send your answer to user through message and then exit.\n' -} - - def process_instance( - agent, - instance, - metadata, - eval_output_dir, + instance: pd.Series, + metadata: EvalMetadata, reset_logger: bool = True, ): - # ============================================= - # preparation - # ============================================= + # Create the agent + agent = Agent.get_cls(metadata.agent_class)(llm=LLM(llm_config=metadata.llm_config)) inst_id = instance.instance_id question = instance.description @@ -104,7 +54,9 @@ def process_instance( # Set up the logger properly, so you can run multiprocessing to parallel the evaluation if reset_logger: # Set up logger - log_file = os.path.join(eval_output_dir, 'logs', f'instance_{inst_id}.log') + log_file = os.path.join( + metadata.eval_output_dir, 'logs', f'instance_{inst_id}.log' + ) # Remove all existing handlers from logger for handler in logger.handlers[:]: logger.removeHandler(handler) @@ -140,7 +92,7 @@ def process_instance( 'to you AND NEVER ASK FOR HUMAN HELP.\n' ) # NOTE: You can actually set slightly different instruction for different agents - instruction += AGENT_CLS_TO_INST_SUFFIX[agent.__class__.__name__] + instruction += INST_SUFFIXES[agent.__class__.__name__] # ============================================= # create sandbox and run the agent @@ -164,9 +116,7 @@ def process_instance( run_agent_controller( agent, instruction, - fake_user_response_fn=AGENT_CLS_TO_FAKE_USER_RESPONSE_FN[ - agent.__class__.__name__ - ], + fake_user_response_fn=FAKE_RESPONSES[agent.__class__.__name__], sandbox=sandbox, sid=inst_id, ) @@ -262,154 +212,27 @@ def process_instance( if __name__ == '__main__': + id_column = 'instance_id' args = parse_arguments() - - # ============================================= - # load datasets - # ============================================= dataset = load_dataset('iFurySt/AgentBench') agent_bench_tests = dataset['osbench'].to_pandas() - logger.info(f'Loaded {len(agent_bench_tests)} tests.') - - # ============================================= - # handle arguments and prepare for evaluation - # ============================================= - - if args.llm_config: - specified_llm_config = get_llm_config_arg(args.llm_config) - if specified_llm_config: - config.llm = specified_llm_config - logger.info(f'Config for evaluation: {config}') - - # TEST METADATA - agent_cls = args.agent_cls - assert ( - agent_cls in AGENT_CLS_TO_FAKE_USER_RESPONSE_FN - ), f'Unsupported agent class: {agent_cls}' - model_name = config.llm.model.split('/')[-1] - max_iterations = args.max_iterations - eval_note = '' - if args.eval_note is not None: - eval_note += '_N_' + args.eval_note - eval_op_dir = str( - os.path.join( - args.eval_output_dir, - 'agent_bench', - agent_cls, - model_name + '_maxiter_' + str(max_iterations) + eval_note, - ) + llm_config = get_llm_config_arg(args.llm_config) if args.llm_config else LLMConfig() + metadata = make_metadata( + llm_config, + args.dataset_name, + args.agent_cls, + args.max_iterations, + args.eval_note, + args.eval_output_dir, ) - - pathlib.Path(eval_op_dir).mkdir(parents=True, exist_ok=True) - pathlib.Path(str(os.path.join(eval_op_dir, 'logs'))).mkdir( - parents=True, exist_ok=True + output_file = os.path.join(metadata.eval_output_dir, 'output.jsonl') + instances = prepare_dataset(dataset, output_file, args.eval_n_limit, id_column) + agent = Agent.get_cls(metadata.agent_class)(llm=LLM(llm_config)) + run_evaluation( + instances, + metadata, + output_file, + args.eval_num_workers, + process_instance, + id_column, ) - logger.info(f'Using evaluation output directory: {eval_op_dir}') - - meta = { - 'agent_class': agent_cls, - 'model_name': model_name, - 'max_iterations': max_iterations, - 'eval_output_dir': eval_op_dir, - 'start_time': time.strftime('%Y-%m-%d %H:%M:%S'), - # get the commit id of current repo for reproducibility - 'git_commit': subprocess.check_output(['git', 'rev-parse', 'HEAD']) - .decode('utf-8') - .strip(), - } - logger.info(f'Metadata: {meta}') - with open(os.path.join(eval_op_dir, 'metadata.json'), 'w') as f: - json.dump(meta, f) - - # LIMIT EVALUATION - eval_n_limit = args.eval_n_limit - if eval_n_limit: - agent_bench_tests = agent_bench_tests[:eval_n_limit] - logger.info(f'Limiting evaluation to first {eval_n_limit} instances.') - - # OUTPUT FILE - output_file = os.path.join(eval_op_dir, 'output.jsonl') - logger.info(f'Writing evaluation output to {output_file}') - finished_instance_ids = set() - if os.path.exists(output_file): - with open(output_file, 'r') as f: - for line in f: - data = json.loads(line) - finished_instance_ids.add(data['instance_id']) - logger.warning( - f'Output file {output_file} already exists. Loaded {len(finished_instance_ids)} finished instances.' - ) - output_fp = open(output_file, 'a') - - logger.info( - f'Evaluation started with Agent {agent_cls}, model {model_name}, max iterations {max_iterations}.' - ) - - # ============================================= - # filter out finished instances - # ============================================= - - new_agent_bench_tests = [] - for idx, inst in agent_bench_tests.iterrows(): - if inst.instance_id in finished_instance_ids: - logger.info( - f'Skipping instance {inst.instance_id} as it is already finished.' - ) - continue - new_agent_bench_tests.append(inst) - - agent_bench_tests = new_agent_bench_tests - logger.info( - f'Finished instances: {len(finished_instance_ids)}, Remaining instances: {len(agent_bench_tests)}' - ) - - # ============================================= - # start task - # ============================================= - - pbar = tqdm(total=len(agent_bench_tests)) - - # This function tracks the progress AND write the output to a JSONL file - def update_progress(fut): - pbar.update(1) - output = fut.result() - pbar.set_description(f'Instance {output["instance_id"]}') - pbar.set_postfix_str(f'Test Result: {output["test_result"]["result"]}') - logger.info( - f'Finished evaluation for instance {output["instance_id"]}: {output["test_result"]["result"]}' - ) - output_fp.write(json.dumps(output) + '\n') - output_fp.flush() - - # This sets the multiprocessing - num_workers = args.eval_num_workers - logger.info(f'Using {num_workers} workers for evaluation.') - - # Create the agent - agent = Agent.get_cls(agent_cls)(llm=LLM(config.llm)) - - try: - with ProcessPoolExecutor(num_workers) as executor: - futures = [] - # This is how we perform multiprocessing - for inst in agent_bench_tests: - future = executor.submit( - process_instance, - agent, - inst, - meta, - eval_op_dir, - reset_logger=bool(num_workers > 1), - ) - future.add_done_callback(update_progress) - futures.append(future) - - # Wait for all futures to complete - for future in futures: - future.result() - except KeyboardInterrupt: - print('KeyboardInterrupt received. Cleaning up...') - cleanup() - - output_fp.close() - logger.info('Evaluation finished.') diff --git a/evaluation/biocoder/run_infer.py b/evaluation/biocoder/run_infer.py index fb5860a2ad..05be46a70b 100644 --- a/evaluation/biocoder/run_infer.py +++ b/evaluation/biocoder/run_infer.py @@ -4,22 +4,26 @@ import logging import multiprocessing as mp import os import pathlib -import subprocess -import time -from concurrent.futures import ProcessPoolExecutor +from functools import partial import pandas as pd from datasets import load_dataset -from tqdm import tqdm from evaluation.biocoder.biocoder_env_box import BiocoderData, BiocoderSSHBox +from evaluation.utils.shared import ( + EvalMetadata, + codeact_user_response, + make_metadata, + monologue_user_response, + prepare_dataset, + run_evaluation, +) from opendevin.controller.agent import Agent from opendevin.controller.state.state import State -from opendevin.core.config import config, get_llm_config_arg, parse_arguments +from opendevin.core.config import LLMConfig, config, get_llm_config_arg, parse_arguments from opendevin.core.logger import get_console_handler from opendevin.core.logger import opendevin_logger as logger from opendevin.core.main import run_agent_controller -from opendevin.events.action import MessageAction from opendevin.events.serialization.event import event_to_dict from opendevin.llm.llm import LLM @@ -32,33 +36,10 @@ def cleanup(): process.join() -def codeact_user_response(state: State) -> str: - msg = ( - 'Please continue working on the task on whatever approach you think is suitable.\n' - 'If you think you have modified the code in a way that fixes the issue, please run the following command: exit .\n' - 'IMPORTANT: YOU SHOULD NEVER ASK FOR HUMAN HELP OR USE THE INTERNET TO SOLVE THIS TASK.\n' - ) - if state.history: - user_msgs = [ - action - for action, _ in state.history - if isinstance(action, MessageAction) and action.source == 'user' - ] - if len(user_msgs) >= 2: - # let the agent know that it can give up when it has tried 3 times - return ( - msg - + 'If you want to give up, run: exit .\n' - ) - return msg - - -def monologue_user_response(state: State) -> str: - raise NotImplementedError('MonologueAgent should never ask for user responses.') - - AGENT_CLS_TO_FAKE_USER_RESPONSE_FN = { - 'CodeActAgent': codeact_user_response, + 'CodeActAgent': partial( + codeact_user_response, encapsulate_solution=True, try_parse=None + ), 'MonologueAgent': monologue_user_response, } @@ -112,13 +93,12 @@ def get_test_result(instance, sandbox, workspace_dir_name): def process_instance( - agent: Agent, - instance, - metadata, - skip_workspace_mount, - eval_output_dir, + instance: pd.Series, + metadata: EvalMetadata, reset_logger: bool = True, ): + # Create the agent + agent = Agent.get_cls(metadata.agent_class)(llm=LLM(llm_config=metadata.llm_config)) instance = BiocoderData(**instance) print(instance) workspace_dir_name = ( @@ -130,15 +110,14 @@ def process_instance( # create process-specific workspace dir # if `not skip_workspace_mount` - we will create a workspace directory for EACH process # so that different agent don't interfere with each other. - if not skip_workspace_mount: - workspace_mount_path = os.path.join(workspace_mount_path, str(os.getpid())) - pathlib.Path(workspace_mount_path).mkdir(parents=True, exist_ok=True) + workspace_mount_path = os.path.join(workspace_mount_path, str(os.getpid())) + pathlib.Path(workspace_mount_path).mkdir(parents=True, exist_ok=True) # Setup the logger properly, so you can run multi-processing to parallize the evaluation if reset_logger: # Set up logger log_file = os.path.join( - eval_output_dir, 'logs', f'instance_{instance.test_case_id}.log' + metadata.eval_output_dir, 'logs', f'instance_{instance.test_case_id}.log' ) # Remove all existing handlers from logger for handler in logger.handlers[:]: @@ -157,8 +136,7 @@ def process_instance( ) logger.addHandler(file_handler) - if not skip_workspace_mount: - logger.info(f'Process-specific workspace mounted at {workspace_mount_path}') + logger.info(f'Process-specific workspace mounted at {workspace_mount_path}') # NOTE: this is something special we do for SWE-Bench due to the reason described in the previous section # You can omit this if you don't need to setup specialized sandbox @@ -192,25 +170,6 @@ def process_instance( 'You do not need to run the code to check if it works. \n' 'Make sure to include proper formatting in Java and Python, including correct braces and/or indentation.\n' ) - - # instruction = ( - # f'In the file {instance.filePath}, there is a function with a signature and without a body. Your job is to complete the function, according to the given instructions. When you complete the function, respond with the function body, and nothing else.' - # 'The repository has cloned for you to start working. You are not allowed to run any bash commands, just modify the files. \n\n' - # '# Problem Statement\n' - # 'Complete the following function signature:\n\n' - # f'{instance.signature}' - # 'The function should do the following:\n\n' - # f'{instance.promptSummaryOnly}\n\n' - # ) - # - # instruction += ( - # 'IMPORTANT: You should ONLY interact with the environment provided to you AND NEVER ASK FOR HUMAN HELP.\n' - # 'You should NOT modify any other files other than the file intended. This means that you should NOT write any test cases.\n' - # 'Do NOT add any import statements or change anything else other than the writing the function body.\n' - # 'You do not need to run the code to check if it works. The system will automatically check the correctness of your code.\n' - # 'Make sure to include proper formatting in Java and Python, including correct braces and/or indentation.\n' - # ) - # NOTE: You can actually set slightly different instruction for different agents instruction += AGENT_CLS_TO_INST_SUFFIX[agent.__class__.__name__] @@ -257,150 +216,27 @@ def process_instance( if __name__ == '__main__': + id_column = 'test_case_id' args = parse_arguments() - - # NOTE: It is preferable to load datasets from huggingface datasets and perform post-processing - # so we don't need to manage file uploading to OpenDevin's repo dataset = load_dataset('lilbillbiscuit/biocoder_public') biocoder_tests = dataset['test'].to_pandas() - - # Check https://github.com/OpenDevin/OpenDevin/blob/main/evaluation/swe_bench/README.md#configure-opendevin-and-your-llm - # for details of how to set `llm_config` - if args.llm_config: - specified_llm_config = get_llm_config_arg(args.llm_config) - if specified_llm_config: - config.llm = specified_llm_config - logger.info(f'Config for evaluation: {config}') - - # TEST METADATA - agent_class = args.agent_cls - assert ( - agent_class in AGENT_CLS_TO_FAKE_USER_RESPONSE_FN - ), f'Unsupported agent class: {agent_class}' - model_name = config.llm.model.split('/')[-1] - max_iterations = args.max_iterations - eval_note = '' - if args.eval_note is not None: - eval_note += '_N_' + args.eval_note - eval_output_dir = os.path.join( + llm_config = get_llm_config_arg(args.llm_config) if args.llm_config else LLMConfig() + metadata = make_metadata( + llm_config, + args.dataset_name, + args.agent_cls, + args.max_iterations, + args.eval_note, args.eval_output_dir, - 'biocoder', - agent_class, - model_name + '_maxiter_' + str(max_iterations) + eval_note, ) - - eval_output_dir = str(eval_output_dir) - - pathlib.Path(eval_output_dir).mkdir(parents=True, exist_ok=True) - pathlib.Path(os.path.join(eval_output_dir, 'logs')).mkdir( - parents=True, exist_ok=True + output_file = os.path.join(metadata.eval_output_dir, 'output.jsonl') + instances = prepare_dataset(dataset, output_file, args.eval_n_limit, id_column) + agent = Agent.get_cls(metadata.agent_class)(llm=LLM(llm_config)) + run_evaluation( + instances, + metadata, + output_file, + args.eval_num_workers, + process_instance, + id_column, ) - logger.info(f'Using evaluation output directory: {eval_output_dir}') - - metadata = { - 'agent_class': agent_class, - 'model_name': model_name, - 'max_iterations': max_iterations, - 'eval_output_dir': eval_output_dir, - 'start_time': time.strftime('%Y-%m-%d %H:%M:%S'), - # get the commit id of current repo for reproduciblity - 'git_commit': subprocess.check_output(['git', 'rev-parse', 'HEAD']) - .decode('utf-8') - .strip(), - } - logger.info(f'Metadata: {metadata}') - with open(os.path.join(eval_output_dir, 'metadata.json'), 'w') as f: - json.dump(metadata, f) - - # LIMIT EVALUATION - eval_n_limit = args.eval_n_limit - if eval_n_limit: - biocoder_tests = biocoder_tests.head(eval_n_limit) - logger.info(f'Limiting evaluation to first {eval_n_limit} instances.') - - # OUTPUT FILE - output_file = os.path.join(eval_output_dir, 'output.jsonl') - logger.info(f'Writing evaluation output to {output_file}') - finished_test_case_ids = set() - if os.path.exists(output_file): - with open(output_file, 'r') as f: - for line in f: - data = json.loads(line) - finished_test_case_ids.add(data['test_case_id']) - logger.warning( - f'Output file {output_file} already exists. Loaded {len(finished_test_case_ids)} finished instances.' - ) - output_fp = open(output_file, 'a') - - logger.info( - f'Evaluation started with Agent {agent_class}, model {model_name}, max iterations {max_iterations}.' - ) - - # ============================================= - # filter out finished instances - new_biocoder_tests = [] - for idx, instance in biocoder_tests.iterrows(): - if instance.test_case_id in finished_test_case_ids: - logger.info( - f'Skipping instance {instance.test_case_id} as it is already finished.' - ) - continue - new_biocoder_tests.append(instance) - - biocoder_tests = pd.DataFrame(new_biocoder_tests) - logger.info( - f'Finished instances: {len(finished_test_case_ids)}, Remaining instances: {len(biocoder_tests)}' - ) - # ============================================= - - pbar = tqdm(total=len(biocoder_tests)) - - # This function tracks the progress AND write the output to a JSONL file - def update_progress(future): - pbar.update(1) - output = future.result() - pbar.set_description(f'Instance {output["test_case_id"]}') - pbar.set_postfix_str(f'Test Result: {output["test_result"]}') - logger.info( - f'Finished evaluation for instance {output["test_case_id"]}: {output["test_result"]}' - ) - output_fp.write(json.dumps(output) + '\n') - output_fp.flush() - - # This sets the multi-processing - num_workers = args.eval_num_workers - logger.info(f'Using {num_workers} workers for evaluation.') - - # This is SWE-Bench specific - CodeActAgent doesn't require mounted workspace to work - skip_workspace_mount = agent_class == 'CodeActAgent' - logger.info(f'Skipping workspace mount: {skip_workspace_mount}') - - # Create the agent - agent = Agent.get_cls(agent_class)(llm=LLM(config.llm)) - - try: - with ProcessPoolExecutor(num_workers) as executor: - futures = [] - # This is how we perform multi-processing - for row_idx, instance in biocoder_tests.iterrows(): - future = executor.submit( - process_instance, - agent, - instance, - metadata, - skip_workspace_mount, - eval_output_dir, - reset_logger=bool(num_workers > 1), - ) - future.add_done_callback(update_progress) - futures.append(future) - - # Wait for all futures to complete - for future in futures: - future.result() - except KeyboardInterrupt: - print('KeyboardInterrupt received. Cleaning up...') - cleanup() - - output_fp.close() - logger.info('Evaluation finished.') diff --git a/evaluation/bird/run_infer.py b/evaluation/bird/run_infer.py index 7ec0a90f63..864329b102 100644 --- a/evaluation/bird/run_infer.py +++ b/evaluation/bird/run_infer.py @@ -8,17 +8,21 @@ import re import shutil import sqlite3 import subprocess -import time -from concurrent.futures import ProcessPoolExecutor import pandas as pd from datasets import load_dataset from func_timeout import FunctionTimedOut, func_timeout from tqdm import tqdm +from evaluation.utils.shared import ( + EvalMetadata, + make_metadata, + prepare_dataset, + run_evaluation, +) from opendevin.controller.agent import Agent from opendevin.controller.state.state import State -from opendevin.core.config import config, get_llm_config_arg, parse_arguments +from opendevin.core.config import LLMConfig, config, get_llm_config_arg, parse_arguments from opendevin.core.logger import get_console_handler from opendevin.core.logger import opendevin_logger as logger from opendevin.core.main import run_agent_controller @@ -128,17 +132,17 @@ def get_test_result(instance, path, timeout=30): def process_instance( - agent, instance, metadata, skip_workspace_mount, reset_logger: bool = True + instance: pd.Series, + metadata: EvalMetadata, + reset_logger: bool = True, ): + # Create the agent + agent = Agent.get_cls(metadata.agent_class)(llm=LLM(llm_config=metadata.llm_config)) workspace_mount_path = os.path.join( config.workspace_mount_path, 'bird_eval_workspace' ) - # create process-specific workspace dir - # if `not skip_workspace_mount` - we will create a workspace directory for EACH process - # so that different agent don't interfere with each other. - if not skip_workspace_mount: - workspace_mount_path = os.path.join(workspace_mount_path, str(os.getpid())) - pathlib.Path(workspace_mount_path).mkdir(parents=True, exist_ok=True) + workspace_mount_path = os.path.join(workspace_mount_path, str(os.getpid())) + pathlib.Path(workspace_mount_path).mkdir(parents=True, exist_ok=True) # reset workspace to config config.workspace_mount_path = workspace_mount_path @@ -162,7 +166,7 @@ def process_instance( if reset_logger: # Set up logger log_file = os.path.join( - eval_output_dir, + metadata.eval_output_dir, 'logs', f'instance_{sid}.log', ) @@ -183,8 +187,7 @@ def process_instance( ) logger.addHandler(file_handler) - if not skip_workspace_mount: - logger.info(f'Process-specific workspace mounted at {workspace_mount_path}') + logger.info(f'Process-specific workspace mounted at {workspace_mount_path}') # Create file with BIRD instance statements = f""" @@ -386,143 +389,27 @@ def create_prompt(e, database_path): if __name__ == '__main__': + id_column = 'task_id' args = parse_arguments() - # NOTE: It is preferable to load datasets from huggingface datasets and perform post-processing - # so we don't need to manage file uploading to OpenDevin's repo - # Due to the large size of the BIRD database, it cannot be hosted on huggingface datasets, so it needs to be downloaded bird_dataset = load_bird() - bird_tests = bird_dataset['test'].to_pandas() - - # Check https://github.com/OpenDevin/OpenDevin/blob/main/evaluation/humanevalfix/README.md#configure-opendevin-and-your-llm - # for details of how to set `llm_config` - if args.llm_config: - specified_llm_config = get_llm_config_arg(args.llm_config) - if specified_llm_config: - config.llm = specified_llm_config - logger.info(f'Config for evaluation: {config}') - - # TEST METADATA - agent_class = args.agent_cls - assert ( - agent_class in AGENT_CLS_TO_FAKE_USER_RESPONSE_FN - ), f'Unsupported agent class: {agent_class}' - model_name = config.llm.model.split('/')[-1] - max_iterations = args.max_iterations - eval_note = '' - if args.eval_note is not None: - eval_note += '_N_' + args.eval_note - eval_output_dir = os.path.join( + dataset = bird_dataset['test'].to_pandas() + llm_config = get_llm_config_arg(args.llm_config) if args.llm_config else LLMConfig() + metadata = make_metadata( + llm_config, + args.dataset_name, + args.agent_cls, + args.max_iterations, + args.eval_note, args.eval_output_dir, - 'bird', - agent_class, - model_name + '_maxiter_' + str(max_iterations) + eval_note, ) - - pathlib.Path(eval_output_dir).mkdir(parents=True, exist_ok=True) - pathlib.Path(os.path.join(eval_output_dir, 'logs')).mkdir( - parents=True, exist_ok=True + output_file = os.path.join(metadata.eval_output_dir, 'output.jsonl') + instances = prepare_dataset(dataset, output_file, args.eval_n_limit, id_column) + agent = Agent.get_cls(metadata.agent_class)(llm=LLM(llm_config)) + run_evaluation( + instances, + metadata, + output_file, + args.eval_num_workers, + process_instance, + id_column, ) - logger.info(f'Using evaluation output directory: {eval_output_dir}') - - metadata = { - 'agent_class': agent_class, - 'model_name': model_name, - 'max_iterations': max_iterations, - 'eval_output_dir': eval_output_dir, - 'start_time': time.strftime('%Y-%m-%d %H:%M:%S'), - # get the commit id of current repo for reproducibility - 'git_commit': subprocess.check_output(['git', 'rev-parse', 'HEAD']) - .decode('utf-8') - .strip(), - } - logger.info(f'Metadata: {metadata}') - with open(os.path.join(eval_output_dir, 'metadata.json'), 'w') as f: - json.dump(metadata, f) - - # LIMIT EVALUATION - eval_n_limit = args.eval_n_limit - if eval_n_limit: - bird_tests = bird_tests.head(eval_n_limit) - logger.info(f'Limiting evaluation to first {eval_n_limit} instances.') - - # OUTPUT FILE - output_file = os.path.join(eval_output_dir, 'output.jsonl') - logger.info(f'Writing evaluation output to {output_file}') - finished_instance_ids = set() - if os.path.exists(output_file): - with open(output_file, 'r') as f: - for line in f: - data = json.loads(line) - finished_instance_ids.add(data['task_id']) - logger.warning( - f'Output file {output_file} already exists. Loaded {len(finished_instance_ids)} finished instances.' - ) - output_fp = open(output_file, 'a') - - logger.info( - f'Evaluation started with Agent {agent_class}, model {model_name}, max iterations {max_iterations}.' - ) - - # ============================================= - # filter out finished instances - new_bird_tests = [] - for idx, instance in bird_tests.iterrows(): - if instance.task_id in finished_instance_ids: - logger.info( - f'Skipping instance {instance.task_id} as it is already finished.' - ) - continue - new_bird_tests.append(instance) - - bird_tests = pd.DataFrame(new_bird_tests) - logger.info( - f'Finished instances: {len(finished_instance_ids)}, Remaining instances: {len(bird_tests)}' - ) - # ============================================= - - pbar = tqdm(total=len(bird_tests)) - - # This function tracks the progress AND write the output to a JSONL file - def update_progress(future): - pbar.update(1) - output = future.result() - pbar.set_description(f'Instance {output["task_id"]}') - pbar.set_postfix_str(f'Test Result: {output["test_result"]["result"]}') - logger.info( - f'Finished evaluation for instance {output["task_id"]}: {output["test_result"]["result"]}' - ) - output_fp.write(json.dumps(output) + '\n') - output_fp.flush() - - # This sets the multi-processing - num_workers = args.eval_num_workers - logger.info(f'Using {num_workers} workers for evaluation.') - - # Create the agent - agent = Agent.get_cls(agent_class)(llm=LLM(config.llm)) - - try: - with ProcessPoolExecutor(num_workers) as executor: - futures = [] - # This is how we perform multi-processing - for row_idx, instance in bird_tests.iterrows(): - future = executor.submit( - process_instance, - agent, - instance, - metadata, - skip_workspace_mount=False, - reset_logger=bool(num_workers > 1), - ) - future.add_done_callback(update_progress) - futures.append(future) - - # Wait for all futures to complete - for future in futures: - future.result() - except KeyboardInterrupt: - print('KeyboardInterrupt received. Cleaning up...') - cleanup() - - output_fp.close() - logger.info('Evaluation finished.') diff --git a/evaluation/gaia/run_infer.py b/evaluation/gaia/run_infer.py index 78ed117306..72bc10c27d 100644 --- a/evaluation/gaia/run_infer.py +++ b/evaluation/gaia/run_infer.py @@ -1,23 +1,27 @@ import asyncio -import json import logging -import multiprocessing as mp import os import pathlib import re import shutil -import subprocess -import time -from concurrent.futures import ProcessPoolExecutor +from functools import partial import huggingface_hub +import pandas as pd from datasets import load_dataset -from tqdm import tqdm from evaluation.gaia.scorer import question_scorer +from evaluation.utils.shared import ( + EvalMetadata, + codeact_user_response, + make_metadata, + monologue_user_response, + prepare_dataset, + run_evaluation, +) from opendevin.controller.agent import Agent from opendevin.controller.state.state import State -from opendevin.core.config import config, get_llm_config_arg, get_parser +from opendevin.core.config import config, get_parser from opendevin.core.logger import get_console_handler from opendevin.core.logger import opendevin_logger as logger from opendevin.core.main import run_agent_controller @@ -29,43 +33,8 @@ DATASET_CACHE_DIR = '~/.cache/open-devin/evals/gaia' DATASET_CACHE_DIR = os.path.expanduser(DATASET_CACHE_DIR) -def cleanup(): - logger.info('Cleaning up child processes...') - for process in mp.active_children(): - logger.info(f'Terminating child process: {process.name}') - process.terminate() - process.join() - - -def codeact_user_response(state: State) -> str: - msg = ( - 'Please continue working on the task on whatever approach you think is suitable.\n' - 'If you think you have solved the task, please first send your answer to user through message and then exit .\n' - 'Please encapsulate your final answer (answer ONLY) within and .\n' - 'For example: The answer to the question is 42 .\n' - 'IMPORTANT: YOU SHOULD NEVER ASK FOR HUMAN HELP.\n' - ) - if state.history: - user_msgs = [ - action - for action, _ in state.history - if isinstance(action, MessageAction) and action.source == 'user' - ] - if len(user_msgs) >= 2: - # let the agent know that it can give up when it has tried 3 times - return ( - msg - + 'If you want to give up, run: exit .\n' - ) - return msg - - -def monologue_user_response(state: State) -> str: - raise NotImplementedError('MonologueAgent should never ask for user responses.') - - AGENT_CLS_TO_FAKE_USER_RESPONSE_FN = { - 'CodeActAgent': codeact_user_response, + 'CodeActAgent': partial(codeact_user_response, encapsulate_solution=True), 'MonologueAgent': monologue_user_response, } @@ -74,7 +43,13 @@ AGENT_CLS_TO_INST_SUFFIX = { } -def process_instance(agent, instance, metadata, reset_logger: bool = True): +def process_instance( + instance: pd.Series, + metadata: EvalMetadata, + reset_logger: bool = True, +): + # Create the agent + agent = Agent.get_cls(metadata.agent_class)(llm=LLM(llm_config=metadata.llm_config)) # create process-specific workspace dir # we will create a workspace directory for EACH process # so that different agent don't interfere with each other. @@ -89,7 +64,7 @@ def process_instance(agent, instance, metadata, reset_logger: bool = True): config.workspace_mount_path = workspace_mount_path # Setup the logger properly, so you can run multi-processing to parallelize the evaluation - eval_output_dir = metadata['eval_output_dir'] + eval_output_dir = metadata.eval_output_dir if reset_logger: # Set up logger log_file = os.path.join( @@ -115,8 +90,9 @@ def process_instance(agent, instance, metadata, reset_logger: bool = True): logger.info(f'Process-specific workspace mounted at {workspace_mount_path}') if instance['file_name'] != '': # if this question comes with a file, we need to save it to the workspace + assert metadata.data_split is not None src_file = os.path.join( - DATASET_CACHE_DIR, '2023', metadata['data_split'], instance['file_name'] + DATASET_CACHE_DIR, '2023', metadata.data_split, instance['file_name'] ) extension_name = instance['file_name'].split('.')[-1] dest_file = os.path.join(workspace_mount_path, f'file.{extension_name}') @@ -218,159 +194,42 @@ if __name__ == '__main__': type=str, help='gaia level to evaluate, eg. 2023_level1', ) - parser.add_argument( - '--data-split', - type=str, - help='data split to evaluate, eg. validation', - ) args, _ = parser.parse_known_args() if args.directory: config.workspace_base = os.path.abspath(args.directory) logger.info(f'Setting workspace base to {config.workspace_base}') - # NOTE: It is preferable to load datasets from huggingface datasets and perform post-processing - # so we don't need to manage file uploading to OpenDevin's repo - level = args.level - data_split = args.data_split - dataset = load_dataset('gaia-benchmark/GAIA', level) + + metadata = make_metadata( + llm_config=config.llm, + dataset_name='gaia', + agent_class=args.agent_cls, + max_iterations=args.max_iterations, + eval_note=args.eval_note, + eval_output_dir=args.eval_output_dir, + data_split=args.data_split, + details={'gaia-level': args.level}, + ) + + dataset = load_dataset('gaia-benchmark/GAIA', args.level) huggingface_hub.snapshot_download( 'gaia-benchmark/GAIA', repo_type='dataset', local_dir=DATASET_CACHE_DIR, ) - gaia_tests = dataset[data_split] - logger.info(f'Evaluating GAIA-Benchmark {level} {data_split} split') + gaia_tests = dataset[metadata.data_split] - # Check https://github.com/OpenDevin/OpenDevin/blob/main/evaluation/swe_bench/README.md#configure-opendevin-and-your-llm - # for details of how to set `llm_config` - if args.llm_config: - specified_llm_config = get_llm_config_arg(args.llm_config) - if specified_llm_config: - config.llm = specified_llm_config - logger.info(f'Config for evaluation: {config}') - - # TEST METADATA - agent_class = args.agent_cls - assert ( - agent_class in AGENT_CLS_TO_FAKE_USER_RESPONSE_FN - ), f'Unsupported agent class: {agent_class}' - model_name = config.llm.model.split('/')[-1] - max_iterations = args.max_iterations - eval_note = '' - if args.eval_note is not None: - eval_note += '_N_' + args.eval_note - eval_output_dir = os.path.join( - args.eval_output_dir, - 'gaia', - agent_class, - model_name + '_maxiter_' + str(max_iterations) + eval_note, + output_file = os.path.join(metadata.eval_output_dir, 'output.jsonl') + prepared_dataset = prepare_dataset( + gaia_tests.to_pandas(), output_file, args.eval_n_limit, 'task_id' ) - pathlib.Path(eval_output_dir).mkdir(parents=True, exist_ok=True) - pathlib.Path(os.path.join(eval_output_dir, 'logs')).mkdir( - parents=True, exist_ok=True + agent = Agent.get_cls(args.agent_cls)(llm=LLM(config.llm)) + + run_evaluation( + dataset=prepared_dataset, + metadata=metadata, + output_file=output_file, + num_workers=args.eval_num_workers, + process_instance_func=process_instance, + id_column='task_id', ) - logger.info(f'Using evaluation output directory: {eval_output_dir}') - - metadata = { - 'gaia-level': level, - 'data_split': data_split, - 'agent_class': agent_class, - 'model_name': model_name, - 'max_iterations': max_iterations, - 'eval_output_dir': eval_output_dir, - 'start_time': time.strftime('%Y-%m-%d %H:%M:%S'), - # get the commit id of current repo for reproducibility - 'git_commit': subprocess.check_output(['git', 'rev-parse', 'HEAD']) - .decode('utf-8') - .strip(), - } - logger.info(f'Metadata: {metadata}') - with open(os.path.join(eval_output_dir, 'metadata.json'), 'w') as f: - json.dump(metadata, f) - - # LIMIT EVALUATION - eval_n_limit = args.eval_n_limit - if eval_n_limit: - gaia_tests = gaia_tests.select(list(range(eval_n_limit))) - logger.info(f'Limiting evaluation to first {eval_n_limit} instances.') - - # OUTPUT FILE - output_file = os.path.join(eval_output_dir, 'output.jsonl') - logger.info(f'Writing evaluation output to {output_file}') - finished_task_ids = set() - if os.path.exists(output_file): - with open(output_file, 'r') as f: - for line in f: - data = json.loads(line) - finished_task_ids.add(data['instance_id']) - logger.warning( - f'Output file {output_file} already exists. Loaded {len(finished_task_ids)} finished instances.' - ) - output_fp = open(output_file, 'a') - - logger.info( - f'Evaluation started with Agent {agent_class}, model {model_name}, max iterations {max_iterations}.' - ) - - # ============================================= - # filter out finished instances - new_gaia_tests = [] - for instance in gaia_tests: - if instance['task_id'] in finished_task_ids: - logger.info( - f'Skipping instance {instance["task_id"]} as it is already finished.' - ) - continue - new_gaia_tests.append(instance) - - gaia_tests = new_gaia_tests - logger.info( - f'Finished instances: {len(finished_task_ids)}, Remaining instances: {len(gaia_tests)}' - ) - # ============================================= - - pbar = tqdm(total=len(gaia_tests)) - - # This function tracks the progress AND write the output to a JSONL file - def update_progress(future): - pbar.update(1) - output = future.result() - pbar.set_description(f'Instance {output["instance_id"]}') - pbar.set_postfix_str(f'Test Result: {output["test_result"]["score"]}') - logger.info( - f'Finished evaluation for instance {output["instance_id"]}: {output["test_result"]}' - ) - output_fp.write(json.dumps(output) + '\n') - output_fp.flush() - - # This sets the multi-processing - num_workers = args.eval_num_workers - logger.info(f'Using {num_workers} workers for evaluation.') - - # Create the agent - agent = Agent.get_cls(agent_class)(llm=LLM(config.llm)) - - try: - with ProcessPoolExecutor(num_workers) as executor: - futures = [] - # This is how we perform multi-processing - for instance in gaia_tests: - future = executor.submit( - process_instance, - agent, - instance, - metadata, - reset_logger=bool(num_workers > 1), - ) - future.add_done_callback(update_progress) - futures.append(future) - - # Wait for all futures to complete - for future in futures: - future.result() - except KeyboardInterrupt: - logger.info('KeyboardInterrupt received. Cleaning up...') - cleanup() - - output_fp.close() - logger.info('Evaluation finished.') diff --git a/evaluation/gpqa/run_infer.py b/evaluation/gpqa/run_infer.py index 9dea241a35..9590b156f9 100644 --- a/evaluation/gpqa/run_infer.py +++ b/evaluation/gpqa/run_infer.py @@ -18,24 +18,26 @@ TODOs: """ import asyncio -import json import logging -import multiprocessing as mp import os import pathlib import random import re -import subprocess -import time -from concurrent.futures import ProcessPoolExecutor import pandas as pd from datasets import load_dataset -from tqdm import tqdm +from evaluation.utils.shared import ( + EvalMetadata, + codeact_user_response, + make_metadata, + monologue_user_response, + prepare_dataset, + run_evaluation, +) from opendevin.controller.agent import Agent from opendevin.controller.state.state import State -from opendevin.core.config import config, get_llm_config_arg, get_parser +from opendevin.core.config import config, get_parser from opendevin.core.logger import get_console_handler from opendevin.core.logger import opendevin_logger as logger from opendevin.core.main import run_agent_controller @@ -43,41 +45,6 @@ from opendevin.events.action import MessageAction from opendevin.events.serialization.event import event_to_dict from opendevin.llm.llm import LLM - -def cleanup(): - logger.info('Cleaning up child processes...') - for process in mp.active_children(): - logger.info(f'Terminating child process: {process.name}') - process.terminate() - process.join() - - -def codeact_user_response(state: State) -> str: - msg = ( - 'Please continue working on the task on whatever approach you think is suitable.\n' - 'Feel free to use all tools for calculations and solving the problem, and web-search for finding relevant facts during the process if needed\n' - 'If you think you have reliably finished solving the problem, first generate a message reporting the final concise answer to the user. Once that is done, please run the following command: exit .\n' - 'IMPORTANT: YOU SHOULD NEVER ASK FOR HUMAN HELP TO SOLVE THIS TASK.\n' - ) - if state.history: - user_msgs = [ - action - for action, _ in state.history - if isinstance(action, MessageAction) and action.source == 'user' - ] - if len(user_msgs) >= 2: - # let the agent know that it can give up when it has tried 3 times - return ( - msg - + 'If you want to give up, just generate a final answer message to the user and in the next turn --> run: exit .\n' - ) - return msg - - -def monologue_user_response(state: State) -> str: - raise NotImplementedError('MonologueAgent should never ask for user responses.') - - AGENT_CLS_TO_FAKE_USER_RESPONSE_FN = { 'CodeActAgent': codeact_user_response, 'MonologueAgent': monologue_user_response, @@ -156,16 +123,12 @@ def convert_instance_dict(instance): def process_instance( - agent: Agent, - instance: dict, - metadata: dict, - skip_workspace_mount: bool, - eval_output_dir: str, + instance: pd.Series, + metadata: EvalMetadata, reset_logger: bool = True, ): - """ - Process a single instance from the dataset - """ + # Create the agent + agent = Agent.get_cls(metadata.agent_class)(llm=LLM(llm_config=metadata.llm_config)) old_workspace_mount_path = config.workspace_mount_path old_workspace_base = config.workspace_base try: @@ -173,31 +136,18 @@ def process_instance( config.workspace_mount_path, '_eval_workspace' ) # create process-specific workspace dir - # if `not skip_workspace_mount` - we will create a workspace directory for EACH process - # so that different agent don't interfere with each other. - skip_workspace_mount = False - if not skip_workspace_mount: - workspace_mount_path = os.path.join(workspace_mount_path, str(os.getpid())) - pathlib.Path(workspace_mount_path).mkdir(parents=True, exist_ok=True) + workspace_mount_path = os.path.join(workspace_mount_path, str(os.getpid())) + pathlib.Path(workspace_mount_path).mkdir(parents=True, exist_ok=True) # reset workspace to config config.workspace_base = workspace_mount_path config.workspace_mount_path = workspace_mount_path - # workspace_mount_path = os.path.join(config.workspace_mount_path, '_eval_workspace') - # workspace_mount_path = os.path.abspath(workspace_mount_path) - # # create process-specific workspace dir - # # if `not skip_workspace_mount` - we will create a workspace directory for EACH process - # # so that different agent don't interfere with each other. - # if not skip_workspace_mount: - # workspace_mount_path = os.path.join(workspace_mount_path, str(os.getpid())) - # pathlib.Path(workspace_mount_path).mkdir(parents=True, exist_ok=True) - # Setup the logger properly, so you can run multi-processing to parallelize the evaluation if reset_logger: # Set up logger log_file = os.path.join( - eval_output_dir, 'logs', f'instance_{instance.instance_id}.log' + metadata.eval_output_dir, 'logs', f'instance_{instance.instance_id}.log' ) # Remove all existing handlers from logger for handler in logger.handlers[:]: @@ -218,8 +168,7 @@ def process_instance( else: logger.info(f'Starting evaluation for instance {instance.instance_id}.') - if not skip_workspace_mount: - logger.info(f'Process-specific workspace mounted at {workspace_mount_path}') + logger.info(f'Process-specific workspace mounted at {workspace_mount_path}') # ======= Run the agent on the instance ======= # Prepare instruction for the agent using suggested format in gpqa codebase @@ -329,148 +278,28 @@ if __name__ == '__main__': gpqa_dataset['task_id'] = gpqa_dataset.index # gpqa_dataset = dataset['train'].to_pandas().sort_values(by='id').reset_index(drop=True) - # Check https://github.com/OpenDevin/OpenDevin/blob/main/evaluation/swe_bench/README.md#configure-opendevin-and-your-llm - # for details of how to set `llm_config` - if args.llm_config: - specified_llm_config = get_llm_config_arg(args.llm_config) - if specified_llm_config: - config.llm = specified_llm_config - logger.info(f'Config for evaluation: {config}') - - # TEST METADATA - agent_class = args.agent_cls - assert ( - agent_class in AGENT_CLS_TO_FAKE_USER_RESPONSE_FN - ), f'Unsupported agent class: {agent_class}' - model_name = config.llm.model.split('/')[-1] - max_iterations = args.max_iterations - eval_note = '' - if args.eval_note is not None: - eval_note += '_N_' + args.eval_note - eval_output_dir = os.path.join( - args.eval_output_dir, - 'gpqa', - agent_class, - model_name + '_maxiter_' + str(max_iterations) + eval_note, + metadata = make_metadata( + llm_config=config.llm, + dataset_name='gpqa', + agent_class=args.agent_cls, + max_iterations=args.max_iterations, + eval_note=args.eval_note, + eval_output_dir=args.eval_output_dir, + data_split=args.data_split, ) - pathlib.Path(eval_output_dir).mkdir(parents=True, exist_ok=True) - pathlib.Path(os.path.join(eval_output_dir, 'logs')).mkdir( - parents=True, exist_ok=True - ) - logger.info(f'Using evaluation output directory: {eval_output_dir}') - - metadata = { - 'agent_class': agent_class, - 'model_name': model_name, - 'max_iterations': max_iterations, - 'eval_output_dir': eval_output_dir, - 'start_time': time.strftime('%Y-%m-%d %H:%M:%S'), - # get the commit id of current repo for reproduciblity - 'git_commit': subprocess.check_output(['git', 'rev-parse', 'HEAD']) - .decode('utf-8') - .strip(), - } - logger.info(f'Metadata: {metadata}') - with open(os.path.join(eval_output_dir, 'metadata.json'), 'w') as f: - json.dump(metadata, f) - - # LIMIT EVALUATION - eval_n_limit = args.eval_n_limit # NOTE: This is useful for debugging and testing using a smaller subset of the dataset - if eval_n_limit: - # start_index = 20 - # gpqa_dataset = gpqa_dataset.iloc[start_index:] - gpqa_dataset = gpqa_dataset.head(eval_n_limit) - logger.info(f'Limiting evaluation to first {eval_n_limit} instances.') - - logger.info('#############################################') - logger.info(f'{eval_n_limit} instances will be evaluated.') - logger.info('#############################################') - - # OUTPUT FILE - output_file = os.path.join(eval_output_dir, 'output.jsonl') - logger.info(f'Writing evaluation output to {output_file}') - finished_instance_ids = set() - if os.path.exists(output_file): - with open(output_file, 'r') as f: - for line in f: - data = json.loads(line) - finished_instance_ids.add(data['instance_id']) - logger.warning( - f'Output file {output_file} already exists. Loaded {len(finished_instance_ids)} finished instances.' - ) - output_fp = open(output_file, 'a') - - logger.info( - f'Evaluation started with Agent {agent_class}, model {model_name}, max iterations {max_iterations}.' + output_file = os.path.join(metadata.eval_output_dir, 'output.jsonl') + prepared_dataset = prepare_dataset( + gpqa_dataset, output_file, args.eval_n_limit, 'task_id' ) - # ============================================= - # filter out finished instances - new_gpqa_dataset = [] - for idx, instance in gpqa_dataset.iterrows(): - # instance = convert_instance_dict(instance) # preprocessing - if instance.instance_id in finished_instance_ids: - logger.info( - f'Skipping instance {instance.instance_id} as it is already finished.' - ) - continue - new_gpqa_dataset.append(instance) + agent = Agent.get_cls(args.agent_cls)(llm=LLM(config.llm)) - gpqa_dataset = pd.DataFrame(new_gpqa_dataset) - logger.info( - f'Finished instances: {len(finished_instance_ids)}, Remaining instances: {len(gpqa_dataset)}' + run_evaluation( + dataset=prepared_dataset, + metadata=metadata, + output_file=output_file, + num_workers=args.eval_num_workers, + process_instance_func=process_instance, + id_column='task_id', ) - # ============================================= - - pbar = tqdm(total=len(gpqa_dataset)) - - # This function tracks the progress AND write the output to a JSONL file - def update_progress(future): - pbar.update(1) - output = future.result() - pbar.set_description(f'Instance {output["instance_id"]}') - pbar.set_postfix_str(f'Test Result: {output["test_result"]["result"]}') - logger.info( - f'Finished evaluation for instance {output["instance_id"]}: {output["test_result"]["result"]}' - ) - output_fp.write(json.dumps(output) + '\n') - output_fp.flush() - - # This sets the multi-processing - num_workers = args.eval_num_workers - logger.info(f'Using {num_workers} workers for evaluation.') - - # This is SWE-Bench specific - CodeActAgent doesn't require mounted workspace to work - skip_workspace_mount = agent_class == 'CodeActAgent' - logger.info(f'Skipping workspace mount: {skip_workspace_mount}') - - # Create the agent - agent = Agent.get_cls(agent_class)(llm=LLM(config.llm)) - - try: - with ProcessPoolExecutor(num_workers) as executor: - futures = [] - # This is how we perform multi-processing - for row_idx, instance in gpqa_dataset.iterrows(): - future = executor.submit( - process_instance, - agent, - instance, - metadata, - skip_workspace_mount, - eval_output_dir, - reset_logger=bool(num_workers > 1), - ) - future.add_done_callback(update_progress) - futures.append(future) - - # Wait for all futures to complete - for future in futures: - future.result() - except KeyboardInterrupt: - print('KeyboardInterrupt received. Cleaning up...') - cleanup() - - output_fp.close() - logger.info('Evaluation finished.') diff --git a/evaluation/humanevalfix/run_infer.py b/evaluation/humanevalfix/run_infer.py index 5c10bb72ce..fa1de3edf0 100644 --- a/evaluation/humanevalfix/run_infer.py +++ b/evaluation/humanevalfix/run_infer.py @@ -10,27 +10,28 @@ TODOs: """ import asyncio -import json import logging -import multiprocessing as mp import os import pathlib -import subprocess -import time -from concurrent.futures import ProcessPoolExecutor import pandas as pd from datasets import load_dataset from evaluate import load -from tqdm import tqdm +from evaluation.utils.shared import ( + EvalMetadata, + codeact_user_response, + make_metadata, + monologue_user_response, + prepare_dataset, + run_evaluation, +) from opendevin.controller.agent import Agent from opendevin.controller.state.state import State -from opendevin.core.config import config, get_llm_config_arg, parse_arguments +from opendevin.core.config import LLMConfig, config, get_llm_config_arg, parse_arguments from opendevin.core.logger import get_console_handler from opendevin.core.logger import opendevin_logger as logger from opendevin.core.main import run_agent_controller -from opendevin.events.action import MessageAction from opendevin.events.serialization.event import event_to_dict from opendevin.llm.llm import LLM @@ -63,40 +64,6 @@ LANGUAGE_TO_NUM_WORKERS = { 'python': 4, } - -def cleanup(): - logger.info('Cleaning up child processes...') - for process in mp.active_children(): - logger.info(f'Terminating child process: {process.name}') - process.terminate() - process.join() - - -def codeact_user_response(state: State) -> str: - msg = ( - 'Please continue working on the task on whatever approach you think is suitable.\n' - 'If you think you have modified the code in a way that fixes the issue, please run the following command: exit .\n' - 'IMPORTANT: YOU SHOULD NEVER ASK FOR HUMAN HELP OR USE THE INTERNET TO SOLVE THIS TASK.\n' - ) - if state.history: - user_msgs = [ - action - for action, _ in state.history - if isinstance(action, MessageAction) and action.source == 'user' - ] - if len(user_msgs) >= 2: - # let the agent know that it can give up when it has tried 3 times - return ( - msg - + 'If you want to give up, run: exit .\n' - ) - return msg - - -def monologue_user_response(state: State) -> str: - raise NotImplementedError('MonologueAgent should never ask for user responses.') - - AGENT_CLS_TO_FAKE_USER_RESPONSE_FN = { 'CodeActAgent': codeact_user_response, 'MonologueAgent': monologue_user_response, @@ -138,8 +105,12 @@ def get_test_result(instance, path, language='python', timeout=10): def process_instance( - agent: Agent, instance, metadata, skip_workspace_mount, reset_logger: bool = True + instance: pd.Series, + metadata: EvalMetadata, + reset_logger: bool = True, ): + # Create the agent + agent = Agent.get_cls(metadata.agent_class)(llm=LLM(llm_config=metadata.llm_config)) old_workspace_mount_path = config.workspace_mount_path old_workspace_base = config.workspace_base @@ -148,11 +119,8 @@ def process_instance( config.workspace_mount_path, '_eval_workspace' ) # create process-specific workspace dir - # if `not skip_workspace_mount` - we will create a workspace directory for EACH process - # so that different agent don't interfere with each other. - if not skip_workspace_mount: - workspace_mount_path = os.path.join(workspace_mount_path, str(os.getpid())) - pathlib.Path(workspace_mount_path).mkdir(parents=True, exist_ok=True) + workspace_mount_path = os.path.join(workspace_mount_path, str(os.getpid())) + pathlib.Path(workspace_mount_path).mkdir(parents=True, exist_ok=True) # reset workspace to config config.workspace_base = workspace_mount_path @@ -165,7 +133,7 @@ def process_instance( if reset_logger: # Set up logger log_file = os.path.join( - eval_output_dir, + metadata.eval_output_dir, 'logs', f'instance_{sid}.log', ) @@ -186,8 +154,7 @@ def process_instance( ) logger.addHandler(file_handler) - if not skip_workspace_mount: - logger.info(f'Process-specific workspace mounted at {workspace_mount_path}') + logger.info(f'Process-specific workspace mounted at {workspace_mount_path}') # Create file with HumanEvalFix problem # Prompt reference: https://github.com/bigcode-project/bigcode-evaluation-harness/blob/84b96da31b7f840b55c5733325346176140cdb6b/bigcode_eval/tasks/humanevalpack.py#L509 @@ -266,136 +233,24 @@ if __name__ == '__main__': ) # TODO: Support other languages hefix_tests = dataset['test'].to_pandas() - # Check https://github.com/OpenDevin/OpenDevin/blob/main/evaluation/humanevalfix/README.md#configure-opendevin-and-your-llm - # for details of how to set `llm_config` - if args.llm_config: - specified_llm_config = get_llm_config_arg(args.llm_config) - if specified_llm_config: - config.llm = specified_llm_config - logger.info(f'Config for evaluation: {config}') - - # TEST METADATA - agent_class = args.agent_cls - assert ( - agent_class in AGENT_CLS_TO_FAKE_USER_RESPONSE_FN - ), f'Unsupported agent class: {agent_class}' - model_name = config.llm.model.split('/')[-1] - max_iterations = args.max_iterations - eval_note = '' - if args.eval_note is not None: - eval_note += '_N_' + args.eval_note - eval_output_dir = os.path.join( + id_column = 'task_id' + llm_config = get_llm_config_arg(args.llm_config) if args.llm_config else LLMConfig() + metadata = make_metadata( + llm_config, + args.dataset_name, + args.agent_cls, + args.max_iterations, + args.eval_note, args.eval_output_dir, - 'humanevalfix', - agent_class, - model_name + '_maxiter_' + str(max_iterations) + eval_note, ) - - pathlib.Path(eval_output_dir).mkdir(parents=True, exist_ok=True) - pathlib.Path(os.path.join(eval_output_dir, 'logs')).mkdir( - parents=True, exist_ok=True + output_file = os.path.join(metadata.eval_output_dir, 'output.jsonl') + instances = prepare_dataset(dataset, output_file, args.eval_n_limit, id_column) + agent = Agent.get_cls(metadata.agent_class)(llm=LLM(llm_config)) + run_evaluation( + instances, + metadata, + output_file, + args.eval_num_workers, + process_instance, + id_column, ) - logger.info(f'Using evaluation output directory: {eval_output_dir}') - - metadata = { - 'agent_class': agent_class, - 'model_name': model_name, - 'max_iterations': max_iterations, - 'eval_output_dir': eval_output_dir, - 'start_time': time.strftime('%Y-%m-%d %H:%M:%S'), - # get the commit id of current repo for reproducibility - 'git_commit': subprocess.check_output(['git', 'rev-parse', 'HEAD']) - .decode('utf-8') - .strip(), - } - logger.info(f'Metadata: {metadata}') - with open(os.path.join(eval_output_dir, 'metadata.json'), 'w') as f: - json.dump(metadata, f) - - # LIMIT EVALUATION - eval_n_limit = args.eval_n_limit - if eval_n_limit: - hefix_tests = hefix_tests.head(eval_n_limit) - logger.info(f'Limiting evaluation to first {eval_n_limit} instances.') - - # OUTPUT FILE - output_file = os.path.join(eval_output_dir, 'output.jsonl') - logger.info(f'Writing evaluation output to {output_file}') - finished_instance_ids = set() - if os.path.exists(output_file): - with open(output_file, 'r') as f: - for line in f: - data = json.loads(line) - finished_instance_ids.add(data['task_id']) - logger.warning( - f'Output file {output_file} already exists. Loaded {len(finished_instance_ids)} finished instances.' - ) - output_fp = open(output_file, 'a') - - logger.info( - f'Evaluation started with Agent {agent_class}, model {model_name}, max iterations {max_iterations}.' - ) - - # ============================================= - # filter out finished instances - new_hefix_tests = [] - for idx, instance in hefix_tests.iterrows(): - if instance.task_id in finished_instance_ids: - logger.info( - f'Skipping instance {instance.task_id} as it is already finished.' - ) - continue - new_hefix_tests.append(instance) - - hefix_tests = pd.DataFrame(new_hefix_tests) - logger.info( - f'Finished instances: {len(finished_instance_ids)}, Remaining instances: {len(hefix_tests)}' - ) - # ============================================= - - pbar = tqdm(total=len(hefix_tests)) - - # This function tracks the progress AND write the output to a JSONL file - def update_progress(future): - pbar.update(1) - output = future.result() - pbar.set_description(f'Instance {output["task_id"]}') - pbar.set_postfix_str(f'Test Result: {output["test_result"]["result"]}') - logger.info( - f'Finished evaluation for instance {output["task_id"]}: {output["test_result"]["result"]}' - ) - output_fp.write(json.dumps(output) + '\n') - output_fp.flush() - - # This sets the multi-processing - num_workers = args.eval_num_workers - logger.info(f'Using {num_workers} workers for evaluation.') - - # Create the agent - agent = Agent.get_cls(agent_class)(llm=LLM(config.llm)) - - try: - with ProcessPoolExecutor(num_workers) as executor: - futures = [] - # This is how we perform multi-processing - for row_idx, instance in hefix_tests.iterrows(): - future = executor.submit( - process_instance, - agent, - instance, - metadata, - skip_workspace_mount=False, - reset_logger=bool(num_workers > 1), - ) - future.add_done_callback(update_progress) - futures.append(future) - - # Wait for all futures to complete - for future in futures: - future.result() - except KeyboardInterrupt: - print('KeyboardInterrupt received. Cleaning up...') - cleanup() - - output_fp.close() - logger.info('Evaluation finished.') diff --git a/evaluation/logic_reasoning/run_infer.py b/evaluation/logic_reasoning/run_infer.py index f2a85ce8bd..33385f38b5 100644 --- a/evaluation/logic_reasoning/run_infer.py +++ b/evaluation/logic_reasoning/run_infer.py @@ -1,61 +1,30 @@ import asyncio -import json import logging -import multiprocessing as mp import os import pathlib import shutil -import time -from concurrent.futures import ProcessPoolExecutor +import pandas as pd from datasets import load_dataset -from tqdm import tqdm from evaluation.swe_bench.swe_env_box import DockerSSHBox +from evaluation.utils.shared import ( + EvalMetadata, + codeact_user_response, + make_metadata, + monologue_user_response, + prepare_dataset, + run_evaluation, +) from opendevin.controller.agent import Agent from opendevin.controller.state.state import State -from opendevin.core.config import config, get_llm_config_arg, get_parser +from opendevin.core.config import LLMConfig, config, get_llm_config_arg, get_parser from opendevin.core.logger import get_console_handler from opendevin.core.logger import opendevin_logger as logger from opendevin.core.main import run_agent_controller -from opendevin.events.action import MessageAction from opendevin.events.serialization.event import event_to_dict from opendevin.llm.llm import LLM - -def cleanup(): - logger.info('Cleaning up child processes...') - for process in mp.active_children(): - logger.info(f'Terminating child process: {process.name}') - process.terminate() - process.join() - - -def codeact_user_response(state: State) -> str: - msg = ( - 'Please continue working on the task on whatever approach you think is suitable.\n' - 'If you think you have solved the task, please run the following command: exit .\n' - 'IMPORTANT: YOU SHOULD NEVER ASK FOR HUMAN HELP OR USE THE INTERNET TO SOLVE THIS TASK.\n' - ) - if state.history: - user_msgs = [ - action - for action, _ in state.history - if isinstance(action, MessageAction) and action.source == 'user' - ] - if len(user_msgs) >= 2: - # let the agent know that it can give up when it has tried 3 times - return ( - msg - + 'If you want to give up, run: exit .\n' - ) - return msg - - -def monologue_user_response(state: State) -> str: - raise NotImplementedError('MonologueAgent should never ask for user responses.') - - AGENT_CLS_TO_FAKE_USER_RESPONSE_FN = { 'CodeActAgent': codeact_user_response, 'MonologueAgent': monologue_user_response, @@ -130,13 +99,12 @@ def get_test_result( def process_instance( - agent, - instance, - dataset_name, - skip_workspace_mount, - eval_output_dir, + instance: pd.Series, + metadata: EvalMetadata, reset_logger: bool = True, ): + # Create the agent + agent = Agent.get_cls(metadata.agent_class)(llm=LLM(llm_config=metadata.llm_config)) old_workspace_mount_path = config.workspace_mount_path old_workspace_base = config.workspace_base @@ -145,11 +113,8 @@ def process_instance( config.workspace_mount_path, '_eval_workspace' ) # create process-specific workspace dir - # if `not skip_workspace_mount` - we will create a workspace directory for EACH process - # so that different agent don't interfere with each other. - if not skip_workspace_mount: - workspace_mount_path = os.path.join(workspace_mount_path, str(os.getpid())) - pathlib.Path(workspace_mount_path).mkdir(parents=True, exist_ok=True) + workspace_mount_path = os.path.join(workspace_mount_path, str(os.getpid())) + pathlib.Path(workspace_mount_path).mkdir(parents=True, exist_ok=True) # reset workspace to config config.workspace_base = workspace_mount_path @@ -159,7 +124,7 @@ def process_instance( if reset_logger: # Set up logger log_file = os.path.join( - eval_output_dir, 'logs', f'instance_{instance["id"]}.log' + metadata.eval_output_dir, 'logs', f'instance_{instance["id"]}.log' ) # Remove all existing handlers from logger for handler in logger.handlers[:]: @@ -178,8 +143,7 @@ def process_instance( ) logger.addHandler(file_handler) - if not skip_workspace_mount: - logger.info(f'Process-specific workspace mounted at {workspace_mount_path}') + logger.info(f'Process-specific workspace mounted at {workspace_mount_path}') # sandbox = DockerSSHBox() logic_inference_path = os.path.join(workspace_mount_path, 'logic_inference.py') @@ -300,162 +264,30 @@ if __name__ == '__main__': if args.directory: config.workspace_base = os.path.abspath(args.directory) print(f'Setting workspace base to {config.workspace_base}') - # NOTE: It is preferable to load datasets from huggingface datasets and perform post-processing - # so we don't need to manage file uploading to OpenDevin's repo dataset_name = args.dataset data_split = args.data_split dataset = load_dataset(f'renma/{dataset_name}') logic_reasoning_tests = dataset[data_split] - logger.info(f'Evaluating logic reasoning dataset {dataset_name} {data_split} split') - # Check https://github.com/OpenDevin/OpenDevin/blob/main/evaluation/swe_bench/README.md#configure-opendevin-and-your-llm - # for details of how to set `llm_config` - if args.llm_config: - specified_llm_config = get_llm_config_arg(args.llm_config) - if specified_llm_config: - config.llm = specified_llm_config - logger.info(f'Config for evaluation: {config}') - - # TEST METADATA - agent_class = args.agent_cls - assert ( - agent_class in AGENT_CLS_TO_FAKE_USER_RESPONSE_FN - ), f'Unsupported agent class: {agent_class}' - model_name = config.llm.model.split('/')[-1] - max_iterations = args.max_iterations - eval_note = '' - if args.eval_note is not None: - eval_note += '_N_' + args.eval_note - - eval_output_dir = os.path.join( + id_column = 'id' + llm_config = get_llm_config_arg(args.llm_config) if args.llm_config else LLMConfig() + metadata = make_metadata( + llm_config, + args.dataset_name, + args.agent_cls, + args.max_iterations, + args.eval_note, args.eval_output_dir, - 'logic_reasoning', - agent_class, - dataset_name, - model_name + '_maxiter_' + str(max_iterations) + eval_note, ) - - pathlib.Path(eval_output_dir).mkdir(parents=True, exist_ok=True) - pathlib.Path(os.path.join(eval_output_dir, 'logs')).mkdir( - parents=True, exist_ok=True - ) - logger.info(f'Using evaluation output directory: {eval_output_dir}') - - # LIMIT EVALUATION - eval_n_limit = args.eval_n_limit - if eval_n_limit: - logic_reasoning_tests = logic_reasoning_tests.select(list(range(eval_n_limit))) - logger.info(f'Limiting evaluation to first {eval_n_limit} instances.') - - start_time = time.strftime('%Y-%m-%d %H:%M:%S') - - # OUTPUT FILE - output_file = os.path.join(eval_output_dir, 'output.jsonl') - logger.info(f'Writing evaluation output to {output_file}') - finished_task_ids = set() - if os.path.exists(output_file): - with open(output_file, 'r') as f: - for line in f: - data = json.loads(line) - finished_task_ids.add(data['id']) - logger.warning( - f'Output file {output_file} already exists. Loaded {len(finished_task_ids)} finished instances.' - ) - output_fp = open(output_file, 'a') - - logger.info( - f'Evaluation started with Agent {agent_class}, model {model_name}, max iterations {max_iterations}.' - ) - - # ============================================= - # filter out finished instances - new_logic_reasoning_tests = [] - for instance in logic_reasoning_tests: - if instance['id'] in finished_task_ids: - logger.info( - f'Skipping instance {instance["id"]} as it is already finished.' - ) - continue - new_logic_reasoning_tests.append(instance) - - logic_reasoning_tests = new_logic_reasoning_tests - logger.info( - f'Finished instances: {len(finished_task_ids)}, Remaining instances: {len(logic_reasoning_tests)}' - ) - # ============================================= - - pbar = tqdm(total=len(logic_reasoning_tests)) - - # This function tracks the progress AND write the output to a JSONL file - def update_progress(future): - pbar.update(1) - output = future.result() - pbar.set_description(f'Instance {output["id"]}') - pbar.set_postfix_str(f'Test Result: {output["test_result"]["result"]}') - logger.info( - f'Finished evaluation for instance {output["id"]}: {output["test_result"]["result"]}' - ) - output_fp.write(json.dumps(output) + '\n') - # json.dump(output, output_fp, indent=4) - output_fp.flush() - - # This sets the multi-processing - num_workers = args.eval_num_workers - # num_workers = 1 - logger.info(f'Using {num_workers} workers for evaluation.') - - # This is SWE-Bench specific - CodeActAgent don't requires mounted workspace to work - skip_workspace_mount = False - logger.info(f'Skipping workspace mount: {skip_workspace_mount}') - - # Create the agent - agent = Agent.get_cls(agent_class)(llm=LLM(config.llm)) - - try: - with ProcessPoolExecutor(num_workers) as executor: - futures = [] - # This is how we perform multi-processing - for instance in logic_reasoning_tests: - future = executor.submit( - process_instance, - agent, - instance, - dataset_name, - skip_workspace_mount, - eval_output_dir, - reset_logger=bool(num_workers > 1), - ) - future.add_done_callback(update_progress) - futures.append(future) - - # Wait for all futures to complete - for future in futures: - future.result() - except KeyboardInterrupt: - print('KeyboardInterrupt received. Cleaning up...') - cleanup() - - output_fp.close() - - with open(output_file, 'r') as f: - test_result = [(json.loads(line))['test_result']['result'] for line in f] - - metadata = { - 'Dataset': dataset_name, - 'Data split': data_split, - 'Number of Samples': len(test_result), - 'Agent class': agent_class, - 'Model name': model_name, - 'Start_time': start_time, - 'End_time': time.strftime('%Y-%m-%d %H:%M:%S'), - 'Final Accuracy': f'{sum(test_result)/len(test_result):.2f}', - } - - with open(os.path.join(eval_output_dir, 'metadata.json'), 'w') as f: - json.dump(metadata, f, indent=4) - - logger.info(f'Metadata: {json.dumps(metadata, indent=4)}') - logger.info( - f'Evaluation finished. Metadata saved to {eval_output_dir}/metadata.json' + output_file = os.path.join(metadata.eval_output_dir, 'output.jsonl') + instances = prepare_dataset(dataset, output_file, args.eval_n_limit, id_column) + agent = Agent.get_cls(metadata.agent_class)(llm=LLM(llm_config)) + run_evaluation( + instances, + metadata, + output_file, + args.eval_num_workers, + process_instance, + id_column, ) diff --git a/evaluation/miniwob/run_infer.py b/evaluation/miniwob/run_infer.py index a80ff44d97..fc84fe3b70 100644 --- a/evaluation/miniwob/run_infer.py +++ b/evaluation/miniwob/run_infer.py @@ -2,17 +2,20 @@ import asyncio import json import logging import os -import pathlib -import subprocess -import time import browsergym.miniwob # noqa F401 register miniwob tasks as gym environments import gymnasium as gym -from tqdm import tqdm +import pandas as pd +from evaluation.utils.shared import ( + EvalMetadata, + make_metadata, + prepare_dataset, + run_evaluation, +) from opendevin.controller.agent import Agent from opendevin.controller.state.state import State -from opendevin.core.config import config, get_llm_config_arg, parse_arguments +from opendevin.core.config import LLMConfig, get_llm_config_arg, parse_arguments from opendevin.core.logger import get_console_handler from opendevin.core.logger import opendevin_logger as logger from opendevin.core.main import run_agent_controller @@ -23,19 +26,30 @@ from opendevin.runtime.tools import RuntimeTool SUPPORTED_AGENT_CLS = {'BrowsingAgent'} +docker_ssh_box: DockerSSHBox | None = None + + +def get_sandbox(): + global docker_ssh_box + if docker_ssh_box is None: + docker_ssh_box = DockerSSHBox() + return docker_ssh_box + def process_instance( - agent: Agent, - env_id: str, - metadata: dict, - eval_output_dir: str, - docker_sandbox: DockerSSHBox, + instance: pd.Series, + metadata: EvalMetadata, reset_logger: bool = True, ): + # Create the agent + agent = Agent.get_cls(metadata.agent_class)(llm=LLM(llm_config=metadata.llm_config)) + env_id = instance.id # Setup the logger properly, so you can run multi-processing to parallelize the evaluation if reset_logger: # Set up logger - log_file = os.path.join(eval_output_dir, 'logs', f'instance_{env_id}.log') + log_file = os.path.join( + metadata.eval_output_dir, 'logs', f'instance_{env_id}.log' + ) # Remove all existing handlers from logger for handler in logger.handlers[:]: logger.removeHandler(handler) @@ -59,7 +73,7 @@ def process_instance( runtime_tools_config = { RuntimeTool.BROWSER: { 'browsergym_eval': env_id, - 'browsergym_eval_save_dir': eval_output_dir, + 'browsergym_eval_save_dir': metadata.eval_output_dir, } } @@ -68,7 +82,7 @@ def process_instance( agent, 'PLACEHOLDER_GOAL', runtime_tools_config=runtime_tools_config, - sandbox=docker_sandbox, + sandbox=get_sandbox(), sid=env_id, ) ) @@ -82,7 +96,7 @@ def process_instance( raise ValueError('State should not be None.') metrics = state.metrics.get() if state.metrics else None - browsergym_eval_dir = os.path.join(eval_output_dir, env_id.split('/')[1]) + browsergym_eval_dir = os.path.join(metadata.eval_output_dir, env_id.split('/')[1]) # read goal with open( os.path.join(browsergym_eval_dir, 'goal.txt'), 'r', encoding='utf-8' @@ -114,111 +128,35 @@ def process_instance( if __name__ == '__main__': args = parse_arguments() - env_ids = [ - id for id in gym.envs.registry.keys() if id.startswith('browsergym/miniwob') - ] + dataset = pd.DataFrame( + { + 'id': [ + id + for id in gym.envs.registry.keys() + if id.startswith('browsergym/miniwob') + ] + } + ) - # Check https://github.com/OpenDevin/OpenDevin/blob/main/evaluation/swe_bench/README.md#configure-opendevin-and-your-llm - # for details of how to set `llm_config` - if args.llm_config: - specified_llm_config = get_llm_config_arg(args.llm_config) - if specified_llm_config: - config.llm = specified_llm_config - logger.info(f'Config for evaluation: {config}') - - # TEST METADATA - agent_class = args.agent_cls - assert agent_class in SUPPORTED_AGENT_CLS, f'Unsupported agent class: {agent_class}' - model_name = config.llm.model.split('/')[-1] - max_iterations = args.max_iterations - eval_note = '' - if args.eval_note is not None: - eval_note += '_N_' + args.eval_note - eval_output_dir = os.path.join( + id_column = 'id' + llm_config = get_llm_config_arg(args.llm_config) if args.llm_config else LLMConfig() + metadata = make_metadata( + llm_config, + args.dataset_name, + args.agent_cls, + args.max_iterations, + args.eval_note, args.eval_output_dir, - 'miniwob', - agent_class, - model_name + '_maxiter_' + str(max_iterations) + eval_note, ) - - pathlib.Path(eval_output_dir).mkdir(parents=True, exist_ok=True) - pathlib.Path(os.path.join(eval_output_dir, 'logs')).mkdir( - parents=True, exist_ok=True + output_file = os.path.join(metadata.eval_output_dir, 'output.jsonl') + instances = prepare_dataset(dataset, output_file, args.eval_n_limit, id_column) + agent = Agent.get_cls(metadata.agent_class)(llm=LLM(llm_config)) + _ = get_sandbox() # Initialize the sandbox + run_evaluation( + instances, + metadata, + output_file, + args.eval_num_workers, + process_instance, + id_column, ) - logger.info(f'Using evaluation output directory: {eval_output_dir}') - - metadata = { - 'agent_class': agent_class, - 'model_name': model_name, - 'max_iterations': max_iterations, - 'eval_output_dir': eval_output_dir, - 'start_time': time.strftime('%Y-%m-%d %H:%M:%S'), - # get the commit id of current repo for reproducibility - 'git_commit': subprocess.check_output(['git', 'rev-parse', 'HEAD']) - .decode('utf-8') - .strip(), - } - logger.info(f'Metadata: {metadata}') - with open(os.path.join(eval_output_dir, 'metadata.json'), 'w') as f: - json.dump(metadata, f) - - # LIMIT EVALUATION - eval_n_limit = args.eval_n_limit - if eval_n_limit: - env_ids = env_ids[:eval_n_limit] - logger.info(f'Limiting evaluation to first {eval_n_limit} instances.') - - # OUTPUT FILE - output_file = os.path.join(eval_output_dir, 'output.jsonl') - logger.info(f'Writing evaluation output to {output_file}') - finished_instance_ids = set() - if os.path.exists(output_file): - with open(output_file, 'r') as f: - for line in f: - data = json.loads(line) - finished_instance_ids.add(data['instance_id']) - logger.warning( - f'Output file {output_file} already exists. Loaded {len(finished_instance_ids)} finished instances.' - ) - output_fp = open(output_file, 'a') - - logger.info( - f'Evaluation started with Agent {agent_class}, model {model_name}, max iterations {max_iterations}.' - ) - - # ============================================= - # filter out finished instances - new_env_ids = [] - for idx in env_ids: - if idx in finished_instance_ids: - logger.info(f'Skipping instance {idx} as it is already finished.') - continue - new_env_ids.append(idx) - - env_ids = new_env_ids - logger.info( - f'Finished instances: {len(finished_instance_ids)}, Remaining instances: {len(env_ids)}' - ) - - # ============================================= - # Create the agent - agent = Agent.get_cls(agent_class)(llm=LLM(config.llm)) - - docker_sandbox = DockerSSHBox() - for env_id in tqdm(env_ids): - try: - output = process_instance( - agent=agent, - env_id=env_id, - metadata=metadata, - eval_output_dir=eval_output_dir, - docker_sandbox=docker_sandbox, - reset_logger=False, - ) - output_fp.write(json.dumps(output) + '\n') - output_fp.flush() - except Exception as e: - logger.error(f'Error processing instance {env_id}: {e}') - - output_fp.close() - logger.info('Evaluation finished.') diff --git a/evaluation/mint/run_infer.py b/evaluation/mint/run_infer.py index a0d243aa1c..e953b9c0f4 100644 --- a/evaluation/mint/run_infer.py +++ b/evaluation/mint/run_infer.py @@ -1,45 +1,36 @@ import asyncio import functools -import json import logging -import multiprocessing as mp import os import pathlib -import subprocess -import time -from concurrent.futures import ProcessPoolExecutor -from typing import Dict +from typing import Any, Dict -import tasks from datasets import load_dataset -from tqdm import tqdm from evaluation.swe_bench.swe_env_box import DockerSSHBox +from evaluation.utils.shared import ( + EvalMetadata, + make_metadata, + monologue_user_response, + prepare_dataset, + run_evaluation, +) from opendevin.controller.agent import Agent from opendevin.controller.state.state import State -from opendevin.core.config import config, get_llm_config_arg, get_parser +from opendevin.core.config import LLMConfig, config, get_llm_config_arg, get_parser from opendevin.core.logger import get_console_handler from opendevin.core.logger import opendevin_logger as logger from opendevin.core.main import run_agent_controller from opendevin.events.serialization.event import event_to_dict from opendevin.llm.llm import LLM -from .config_variables import TASK_INFO_MAP from .datatypes import TaskState from .env import SimplifiedEnv from .prompts import ToolPromptTemplate from .tasks import Task -def cleanup(): - print('Cleaning up child processes...') - for process in mp.active_children(): - print(f'Terminating child process: {process.name}') - process.terminate() - process.join() - - -def codeact_user_response(state: State, task: Task, task_config: Dict[str, int]): +def codeact_user_response_mint(state: State, task: Task, task_config: Dict[str, int]): logger.info(f'Gold reference: {task.reference}') logger.info(f'Task config: {task_config}') @@ -49,7 +40,7 @@ def codeact_user_response(state: State, task: Task, task_config: Dict[str, int]) task_config=task_config, ) last_action, _ = state.history[-1] - result_state: TaskState = env.step(last_action.message) + result_state: TaskState = env.step(last_action.message or '') state.task_state = result_state @@ -63,12 +54,8 @@ def codeact_user_response(state: State, task: Task, task_config: Dict[str, int]) return msg -def monologue_user_response(state: State) -> str: - raise NotImplementedError('MonologueAgent should never ask for user responses.') - - AGENT_CLS_TO_FAKE_USER_RESPONSE_FN = { - 'CodeActAgent': codeact_user_response, + 'CodeActAgent': codeact_user_response_mint, 'MonologueAgent': monologue_user_response, } @@ -78,26 +65,21 @@ AGENT_CLS_TO_INST_SUFFIX = { def process_instance( - instance: Task, - agent_class, - metadata, - skip_workspace_mount, - eval_output_dir, + agent: Agent, + instance: Any, + metadata: EvalMetadata, reset_logger: bool = True, ): workspace_mount_path = os.path.join(config.workspace_mount_path, '_eval_workspace') # create process-specific workspace dir - # if `not skip_workspace_mount` - we will create a workspace directory for EACH process - # so that different agent don't interfere with each other. - if not skip_workspace_mount: - workspace_mount_path = os.path.join(workspace_mount_path, str(os.getpid())) - pathlib.Path(workspace_mount_path).mkdir(parents=True, exist_ok=True) + workspace_mount_path = os.path.join(workspace_mount_path, str(os.getpid())) + pathlib.Path(workspace_mount_path).mkdir(parents=True, exist_ok=True) # Setup the logger properly, so you can run multi-processing to parallelize the evaluation if reset_logger: # Set up logger log_file = os.path.join( - eval_output_dir, 'logs', f'instance_{instance.task_id}.log' + metadata.eval_output_dir, 'logs', f'instance_{instance.task_id}.log' ) # Remove all existing handlers from logger for handler in logger.handlers[:]: @@ -116,8 +98,7 @@ def process_instance( ) logger.addHandler(file_handler) - if not skip_workspace_mount: - logger.info(f'Process-specific workspace mounted at {workspace_mount_path}') + logger.info(f'Process-specific workspace mounted at {workspace_mount_path}') # use a session id for concurrent processing sid = instance.task_id + '_' + str(os.getpid()) @@ -136,9 +117,10 @@ def process_instance( exit_code, output = sandbox.execute(f'pip install -r {requirements_sandbox_dest}') # Prepare instruction + assert metadata.details is not None instruction = ToolPromptTemplate(use_tool=True)( - max_total_steps=metadata['max_iterations'], - max_propose_solution=metadata['max_propose_solution'], + max_total_steps=metadata.max_iterations, + max_propose_solution=metadata.details['max_propose_solution'], in_context_example=instance.in_context_example( use_tool=True, with_feedback=False ), @@ -154,8 +136,8 @@ def process_instance( AGENT_CLS_TO_FAKE_USER_RESPONSE_FN[agent.__class__.__name__], task=instance, task_config={ - 'max_iterations': metadata['max_iterations'], - 'max_propose_solution': metadata['max_propose_solution'], + 'max_iterations': metadata.max_iterations, + 'max_propose_solution': metadata.details['max_propose_solution'], }, ) @@ -224,149 +206,28 @@ if __name__ == '__main__': 'ryanhoangt/xingyaoww-mint-bench', name=args.subset, split='test' ) logger.info(f'Evaluating MINT - {args.subset} subset') + mint_tests = mint_dataset.to_pandas() - # Check https://github.com/OpenDevin/OpenDevin/blob/main/evaluation/swe_bench/README.md#configure-opendevin-and-your-llm - # for details of how to set `llm_config` - if args.llm_config: - specified_llm_config = get_llm_config_arg(args.llm_config) - if specified_llm_config: - config.llm = specified_llm_config - logger.info(f'Config for evaluation: {config}') - - # TEST METADATA - agent_class = args.agent_cls - assert ( - agent_class in AGENT_CLS_TO_FAKE_USER_RESPONSE_FN - ), f'Unsupported agent class: {agent_class}' - model_name = config.llm.model.split('/')[-1] - max_iterations = args.max_iterations - eval_note = '' - if args.eval_note is not None: - eval_note += '_N_' + args.eval_note - eval_output_dir = os.path.join( + id_column = 'id' + llm_config = get_llm_config_arg(args.llm_config) if args.llm_config else LLMConfig() + metadata = make_metadata( + llm_config, + args.dataset_name, + args.agent_cls, + args.max_iterations, + args.eval_note, args.eval_output_dir, - 'mint', - agent_class, - model_name + '_maxiter_' + str(max_iterations) + eval_note, - args.subset, + details={'max_propose_solution': args.max_propose_solution}, ) - - pathlib.Path(eval_output_dir).mkdir(parents=True, exist_ok=True) - pathlib.Path(os.path.join(eval_output_dir, 'logs')).mkdir( - parents=True, exist_ok=True + output_file = os.path.join(metadata.eval_output_dir, 'output.jsonl') + instances = prepare_dataset(mint_dataset, output_file, args.eval_n_limit, id_column) + agent = Agent.get_cls(metadata.agent_class)(llm=LLM(llm_config)) + run_evaluation( + agent, + instances, + metadata, + output_file, + args.eval_num_workers, + process_instance, + id_column, ) - logger.info(f'Using evaluation output directory: {eval_output_dir}') - - metadata = { - 'agent_class': agent_class, - 'model_name': model_name, - 'max_iterations': max_iterations, - 'max_propose_solution': args.max_propose_solution, - 'eval_output_dir': eval_output_dir, - 'start_time': time.strftime('%Y-%m-%d %H:%M:%S'), - # get the commit id of current repo for reproducibility - 'git_commit': subprocess.check_output(['git', 'rev-parse', 'HEAD']) - .decode('utf-8') - .strip(), - } - logger.info(f'Metadata: {metadata}') - with open(os.path.join(eval_output_dir, 'metadata.json'), 'w') as f: - json.dump(metadata, f) - - # LIMIT EVALUATION - eval_n_limit = args.eval_n_limit - if eval_n_limit: - mint_dataset = mint_dataset.select(range(eval_n_limit)) - logger.info(f'Limiting evaluation to first {eval_n_limit} instances.') - - # OUTPUT FILE - output_file = os.path.join(eval_output_dir, 'output.jsonl') - logger.info(f'Writing evaluation output to {output_file}') - finished_instance_ids = set() - if os.path.exists(output_file): - with open(output_file, 'r') as f: - for line in f: - data = json.loads(line) - finished_instance_ids.add(data['id']) - logger.warning( - f'Output file {output_file} already exists. Loaded {len(finished_instance_ids)} finished instances.' - ) - output_fp = open(output_file, 'a') - - logger.info( - f'Evaluation started with Agent {agent_class}, model {model_name}, max iterations {max_iterations}, max propose solution {args.max_propose_solution}.' - ) - - # ============================================= - # filter out finished instances - task_class: Task = getattr(tasks, TASK_INFO_MAP[args.subset]['class']) - new_mint_tests: list[Task] = [] - - for instance in mint_dataset: - if instance['id'] in finished_instance_ids: - logger.info( - f'Skipping instance {instance["id"]} as it is already finished.' - ) - continue - # convert to Task object - instance = task_class(**instance) - new_mint_tests.append(instance) - - mint_dataset = new_mint_tests - logger.info( - f'Finished instances: {len(finished_instance_ids)}, Remaining instances: {len(mint_dataset)}' - ) - # ============================================= - - pbar = tqdm(total=len(mint_dataset)) - - # This function tracks the progress AND write the output to a JSONL file - def update_progress(future): - pbar.update(1) - output = future.result() - # logger.info('Output: ', output) - # pbar.set_description(f'Instance {output["instance_id"]}') - # pbar.set_postfix_str(f'Test Result: {output["test_result"]["result"]}') - # logger.info( - # f'Finished evaluation for instance {output["instance_id"]}: {output["test_result"]["result"]}' - # ) - output_fp.write(json.dumps(output) + '\n') - output_fp.flush() - - # This sets the multi-processing - num_workers = args.eval_num_workers - logger.info(f'Using {num_workers} workers for evaluation.') - - # This is SWE-Bench specific - CodeActAgent doesn't require mounted workspace to work - skip_workspace_mount = agent_class == 'CodeActAgent' - logger.info(f'Skipping workspace mount: {skip_workspace_mount}') - - # Create the agent - agent = Agent.get_cls(agent_class)(llm=LLM(config.llm)) - - try: - with ProcessPoolExecutor(num_workers) as executor: - futures = [] - # This is how we perform multi-processing - for instance in mint_dataset: - future = executor.submit( - process_instance, - agent, - instance, - metadata, - skip_workspace_mount, - eval_output_dir, - reset_logger=bool(num_workers > 1), - ) - future.add_done_callback(update_progress) - futures.append(future) - - # Wait for all futures to complete - for future in futures: - future.result() - except KeyboardInterrupt: - print('KeyboardInterrupt received. Cleaning up...') - cleanup() - - output_fp.close() - logger.info('Evaluation finished.') diff --git a/evaluation/ml_bench/run_infer.py b/evaluation/ml_bench/run_infer.py index a0ef174bcb..b6588c4fd7 100644 --- a/evaluation/ml_bench/run_infer.py +++ b/evaluation/ml_bench/run_infer.py @@ -15,63 +15,31 @@ TODOs: """ import asyncio -import json import logging -import multiprocessing as mp import os import pathlib -import subprocess -import time -from concurrent.futures import ProcessPoolExecutor +from typing import Any from datasets import load_dataset -from tqdm import tqdm +from evaluation.utils.shared import ( + EvalMetadata, + codeact_user_response, + make_metadata, + monologue_user_response, + prepare_dataset, + run_evaluation, +) from opendevin.controller.agent import Agent from opendevin.controller.state.state import State -from opendevin.core.config import config, get_llm_config_arg, get_parser +from opendevin.core.config import LLMConfig, config, get_llm_config_arg, get_parser from opendevin.core.logger import get_console_handler from opendevin.core.logger import opendevin_logger as logger from opendevin.core.main import run_agent_controller -from opendevin.events.action import MessageAction from opendevin.events.serialization.event import event_to_dict from opendevin.llm.llm import LLM from opendevin.runtime.docker.ssh_box import DockerSSHBox - -def cleanup(): - logger.info('Cleaning up child processes...') - for process in mp.active_children(): - logger.info(f'Terminating child process: {process.name}') - process.terminate() - process.join() - - -def codeact_user_response(state: State) -> str: - msg = ( - 'Please continue working on the task on whatever approach you think is suitable.\n' - 'If you think you have completed the task, please run the following command: exit .\n' - 'IMPORTANT: YOU SHOULD NEVER ASK FOR HUMAN HELP OR USE THE INTERNET TO SOLVE THIS TASK.\n' - ) - if state.history: - user_msgs = [ - action - for action, _ in state.history - if isinstance(action, MessageAction) and action.source == 'user' - ] - if len(user_msgs) >= 2: - # let the agent know that it can give up when it has tried 3 times - return ( - msg - + 'If you want to give up, run: exit .\n' - ) - return msg - - -def monologue_user_response(state: State) -> str: - raise NotImplementedError('MonologueAgent should never ask for user responses.') - - AGENT_CLS_TO_FAKE_USER_RESPONSE_FN = { 'CodeActAgent': codeact_user_response, 'MonologueAgent': monologue_user_response, @@ -101,7 +69,7 @@ ID2CONDA = { def process_instance( - agent: Agent, instance, metadata, eval_output_dir, reset_logger: bool = True + agent: Agent, instance: Any, metadata: EvalMetadata, reset_logger: bool = True ): old_workspace_mount_path = config.workspace_mount_path old_workspace_base = config.workspace_base @@ -122,7 +90,7 @@ def process_instance( if reset_logger: # Set up logger log_file = os.path.join( - eval_output_dir, + metadata.eval_output_dir, 'logs', f"instance_{instance['id']}_pid_{os.getpid()}.log", ) @@ -268,129 +236,30 @@ if __name__ == '__main__': args, _ = parser.parse_known_args() data_split = args.eval_split - agent_class = args.agent_cls - num_workers = args.eval_num_workers - - # Check https://github.com/OpenDevin/OpenDevin/blob/main/evaluation/swe_bench/README.md#configure-opendevin-and-your-llm - # for details of how to set `llm_config` - if args.llm_config: - specified_llm_config = get_llm_config_arg(args.llm_config) - if specified_llm_config: - config.llm = specified_llm_config - logger.info(f'Config for evaluation: {config}') # NOTE: It is preferable to load datasets from huggingface datasets and perform post-processing # so we don't need to manage file uploading to OpenDevin's repo ml_bench = load_dataset('super-dainiu/ml-bench', split=data_split).to_pandas() - # LIMIT EVALUATION - eval_n_limit = args.eval_n_limit - if eval_n_limit: - ml_bench = ml_bench.head(eval_n_limit) - logger.info(f'Limiting evaluation to {eval_n_limit} instances.') - - # TEST METADATA - model_name = config.llm.model.split('/')[-1] - max_iterations = args.max_iterations - eval_note = '' - if args.eval_note is not None: - eval_note += '_N_' + args.eval_note - eval_output_dir = os.path.join( + id_column = 'instance_id' + llm_config = get_llm_config_arg(args.llm_config) if args.llm_config else LLMConfig() + metadata = make_metadata( + llm_config, + args.dataset_name, + args.agent_cls, + args.max_iterations, + args.eval_note, args.eval_output_dir, - 'ml_bench', - agent_class, - model_name + '_maxiter_' + str(max_iterations) + eval_note, ) - os.makedirs(eval_output_dir, exist_ok=True) - os.makedirs(os.path.join(eval_output_dir, 'logs'), exist_ok=True) - logger.info(f'Using evaluation output directory: {eval_output_dir}') - - metadata = { - 'agent_class': agent_class, - 'model_name': model_name, - 'max_iterations': max_iterations, - 'eval_output_dir': eval_output_dir, - 'start_time': time.strftime('%Y-%m-%d %H:%M:%S'), - # get the commit id of current repo for reproducibility - 'git_commit': subprocess.check_output(['git', 'rev-parse', 'HEAD']) - .decode('utf-8') - .strip(), - } - logger.info(f'Metadata: {metadata}') - - output_file = os.path.join(eval_output_dir, 'output.jsonl') - logger.info(f'Evaluating on data split: {data_split}') - logger.info(f'Using {num_workers} worker processes') - logger.info(f'Writing evaluation output to {output_file}') - - finished_instance_ids = set() - if os.path.exists(output_file): - with open(output_file, 'r') as f: - for line in f: - try: - data = json.loads(line) - except json.JSONDecodeError: - print(f'Error parsing line: {line}') - finished_instance_ids.add(data['instance_id']) - logger.warning( - f'Output file {output_file} already exists. Loaded {len(finished_instance_ids)} finished instances.' - ) - output_fp = open(output_file, 'a') - - logger.info( - f'Evaluation started with Agent {agent_class}, model {model_name}, data split {data_split}.' + output_file = os.path.join(metadata.eval_output_dir, 'output.jsonl') + instances = prepare_dataset(ml_bench, output_file, args.eval_n_limit, id_column) + agent = Agent.get_cls(metadata.agent_class)(llm=LLM(llm_config)) + run_evaluation( + agent, + instances, + metadata, + output_file, + args.eval_num_workers, + process_instance, + id_column, ) - - # Filter out finished instances - new_instances = [ - instance - for _, instance in ml_bench.iterrows() - if instance['id'] not in finished_instance_ids - ] - logger.info( - f'Finished instances: {len(finished_instance_ids)}, Remaining instances: {len(new_instances)}' - ) - - pbar = tqdm(total=len(new_instances)) - - # This function tracks the progress AND writes the output to a JSONL file - def update_progress(future): - pbar.update(1) - output = future.result() - pbar.set_description(f'Instance {output["instance_id"]}') - pbar.set_postfix_str(f'Metrics: {output["metrics"]}') - logger.info( - f'Finished evaluation for instance {output["instance_id"]}: {output["metrics"]}' - ) - output_fp.write(json.dumps(output) + '\n') - output_fp.flush() - - # This sets the multi-processing - num_workers = args.eval_num_workers - logger.info(f'Using {num_workers} workers for evaluation.') - - # Create the agent - agent = Agent.get_cls(agent_class)(llm=LLM(config.llm)) - - try: - with ProcessPoolExecutor(num_workers) as executor: - futures = [] - for _, instance in enumerate(new_instances): - future = executor.submit( - process_instance, - agent, - instance, - metadata, - eval_output_dir, - reset_logger=bool(num_workers > 1), - ) - future.add_done_callback(update_progress) - futures.append(future) - - for future in futures: - output = future.result() - except KeyboardInterrupt: - print('KeyboardInterrupt received. Cleaning up...') - cleanup() - - logger.info('Evaluation completed.') diff --git a/evaluation/swe_bench/run_infer.py b/evaluation/swe_bench/run_infer.py index 89c8f22f10..02c833456e 100644 --- a/evaluation/swe_bench/run_infer.py +++ b/evaluation/swe_bench/run_infer.py @@ -1,29 +1,30 @@ import asyncio -import json import logging import multiprocessing as mp import os import pathlib -import subprocess -import time -from concurrent.futures import ProcessPoolExecutor -from typing import Any import pandas as pd import toml import whatthepatch from datasets import load_dataset -from tqdm import tqdm import agenthub from evaluation.swe_bench.swe_env_box import SWEBenchSSHBox +from evaluation.utils.shared import ( + EvalMetadata, + codeact_user_response, + make_metadata, + monologue_user_response, + prepare_dataset, + run_evaluation, +) from opendevin.controller.agent import Agent from opendevin.controller.state.state import State -from opendevin.core.config import config, get_llm_config_arg, parse_arguments +from opendevin.core.config import LLMConfig, config, get_llm_config_arg, parse_arguments from opendevin.core.logger import get_console_handler from opendevin.core.logger import opendevin_logger as logger from opendevin.core.main import run_agent_controller -from opendevin.events.action import MessageAction from opendevin.events.serialization.event import event_to_dict from opendevin.llm.llm import LLM @@ -38,31 +39,6 @@ def cleanup(): process.join() -def codeact_user_response(state: State) -> str: - msg = ( - 'Please continue working on the task on whatever approach you think is suitable.\n' - 'If you think you have modified the code in a way that fixes the issue, please run the following command: exit .\n' - 'IMPORTANT: YOU SHOULD NEVER ASK FOR HUMAN HELP OR USE THE INTERNET TO SOLVE THIS TASK.\n' - ) - if state.history: - user_msgs = [ - action - for action, _ in state.history - if isinstance(action, MessageAction) and action.source == 'user' - ] - if len(user_msgs) >= 2: - # let the agent know that it can give up when it has tried 3 times - return ( - msg - + 'If you want to give up, run: exit .\n' - ) - return msg - - -def monologue_user_response(state: State) -> str: - raise NotImplementedError('MonologueAgent should never ask for user responses.') - - AGENT_CLS_TO_FAKE_USER_RESPONSE_FN = { 'CodeActAgent': codeact_user_response, 'CodeActSWEAgent': codeact_user_response, @@ -193,30 +169,25 @@ def get_test_result(instance, sandbox, workspace_dir_name): def process_instance( - agent_class: str, - llm_config: dict, - instance: Any, - metadata: dict, - skip_workspace_mount: bool, - eval_output_dir: str, + instance: pd.Series, + metadata: EvalMetadata, reset_logger: bool = True, ): # Create the agent - agent = Agent.get_cls(agent_class)(llm=LLM(llm_config=llm_config)) + agent = Agent.get_cls(metadata.agent_class)(llm=LLM(llm_config=metadata.llm_config)) workspace_mount_path = os.path.join(config.workspace_mount_path, '_eval_workspace') # create process-specific workspace dir - # if `not skip_workspace_mount` - we will create a workspace directory for EACH process - # so that different agent don't interfere with each other. - if not skip_workspace_mount: - workspace_mount_path = os.path.join(workspace_mount_path, str(os.getpid())) - pathlib.Path(workspace_mount_path).mkdir(parents=True, exist_ok=True) + workspace_mount_path = os.path.join(workspace_mount_path, str(os.getpid())) + pathlib.Path(workspace_mount_path).mkdir(parents=True, exist_ok=True) # Setup the logger properly, so you can run multi-processing to parallelize the evaluation if reset_logger: # Set up logger log_file = os.path.join( - eval_output_dir, 'infer_logs', f'instance_{instance.instance_id}.log' + metadata.eval_output_dir, + 'infer_logs', + f'instance_{instance.instance_id}.log', ) # Remove all existing handlers from logger for handler in logger.handlers[:]: @@ -237,22 +208,18 @@ def process_instance( else: logger.info(f'Starting evaluation for instance {instance.instance_id}.') - if not skip_workspace_mount: - logger.info(f'Process-specific workspace mounted at {workspace_mount_path}') - # NOTE: this is something special we do for SWE-Bench due to the reason described in the previous section # You can omit this if you don't need to setup specialized sandbox workspace_dir_name = f'{instance.repo}__{instance.version}'.replace('/', '__') sandbox = SWEBenchSSHBox.get_box_for_instance( instance, workspace_dir_name, - skip_workspace_mount=skip_workspace_mount, workspace_mount_path=workspace_mount_path, - sandbox_plugins=agenthub.Agent.get_cls(agent_class).sandbox_plugins, + sandbox_plugins=agenthub.Agent.get_cls(metadata.agent_class).sandbox_plugins, ) # Prepare instruction - if agent_class == 'CodeActSWEAgent': + if metadata.agent_class == 'CodeActSWEAgent': instruction = ( 'We are currently solving the following issue within our repository. Here is the issue text:\n' '--- BEGIN ISSUE ---\n' @@ -386,144 +353,35 @@ if __name__ == '__main__': dataset = load_dataset('princeton-nlp/SWE-bench_Lite') swe_bench_tests = filter_dataset(dataset['test'].to_pandas(), 'instance_id') - # Check https://github.com/OpenDevin/OpenDevin/blob/main/evaluation/swe_bench/README.md#configure-opendevin-and-your-llm - # for details of how to set `llm_config` - if args.llm_config: - specified_llm_config = get_llm_config_arg(args.llm_config) - if specified_llm_config: - config.llm = specified_llm_config - logger.info(f'Config for evaluation: {config}') + id_column = 'instance_id' + llm_config = get_llm_config_arg(args.llm_config) if args.llm_config else LLMConfig() - # TEST METADATA - agent_class = args.agent_cls - assert ( - agent_class in AGENT_CLS_TO_FAKE_USER_RESPONSE_FN - ), f'Unsupported agent class: {agent_class}' - model_name = config.llm.model.split('/')[-1] - max_iterations = args.max_iterations - eval_note = '' - if args.eval_note is not None: - eval_note += '_N_' + args.eval_note - eval_output_dir = os.path.join( - args.eval_output_dir, - 'swe_bench_lite', - agent_class, - model_name + '_maxiter_' + str(max_iterations) + eval_note, - ) - - pathlib.Path(eval_output_dir).mkdir(parents=True, exist_ok=True) - pathlib.Path(os.path.join(eval_output_dir, 'infer_logs')).mkdir( - parents=True, exist_ok=True - ) - logger.info(f'Using evaluation output directory: {eval_output_dir}') - - metadata = { - 'agent_class': agent_class, - 'model_name': model_name, - 'max_iterations': max_iterations, - 'eval_output_dir': eval_output_dir, - 'start_time': time.strftime('%Y-%m-%d %H:%M:%S'), - # get the commit id of current repo for reproducibility - 'git_commit': subprocess.check_output(['git', 'rev-parse', 'HEAD']) - .decode('utf-8') - .strip(), - } - _agent_cls = agenthub.Agent.get_cls(agent_class) + details = {} + _agent_cls = agenthub.Agent.get_cls(args.agent_cls) if hasattr(_agent_cls, 'system_message'): - metadata['system_message'] = _agent_cls.system_message + details['system_message'] = _agent_cls.system_message if hasattr(_agent_cls, 'in_context_example'): - metadata['in_context_example'] = _agent_cls.in_context_example - logger.info(f'Metadata: {metadata}') - with open(os.path.join(eval_output_dir, 'metadata.json'), 'w') as f: - json.dump(metadata, f) + details['in_context_example'] = _agent_cls.in_context_example - # LIMIT EVALUATION - eval_n_limit = args.eval_n_limit - if eval_n_limit: - swe_bench_tests = swe_bench_tests.head(eval_n_limit) - logger.info(f'Limiting evaluation to first {eval_n_limit} instances.') - - # OUTPUT FILE - output_file = os.path.join(eval_output_dir, 'output.jsonl') - logger.info(f'Writing evaluation output to {output_file}') - finished_instance_ids = set() - if os.path.exists(output_file): - with open(output_file, 'r') as f: - for line in f: - data = json.loads(line) - finished_instance_ids.add(data['instance_id']) - logger.warning( - f'Output file {output_file} already exists. Loaded {len(finished_instance_ids)} finished instances.' - ) - output_fp = open(output_file, 'a') - - logger.info( - f'Evaluation started with Agent {agent_class}, model {model_name}, max iterations {max_iterations}.' + metadata = make_metadata( + llm_config, + 'swe-bench-lite', + args.agent_cls, + args.max_iterations, + args.eval_note, + args.eval_output_dir, + details=details, ) - # ============================================= - # filter out finished instances - new_swe_bench_tests = [] - for idx, instance in swe_bench_tests.iterrows(): - if instance.instance_id in finished_instance_ids: - logger.info( - f'Skipping instance {instance.instance_id} as it is already finished.' - ) - continue - new_swe_bench_tests.append(instance) - - swe_bench_tests = pd.DataFrame(new_swe_bench_tests) - logger.info( - f'Finished instances: {len(finished_instance_ids)}, Remaining instances: {len(swe_bench_tests)}' + output_file = os.path.join(metadata.eval_output_dir, 'output.jsonl') + instances = prepare_dataset( + swe_bench_tests, output_file, args.eval_n_limit, id_column + ) + run_evaluation( + instances, + metadata, + output_file, + args.eval_num_workers, + process_instance, + id_column, ) - # ============================================= - - pbar = tqdm(total=len(swe_bench_tests)) - - # This function tracks the progress AND write the output to a JSONL file - def update_progress(future): - pbar.update(1) - output = future.result() - pbar.set_description(f'Instance {output["instance_id"][:10]}') - pbar.set_postfix_str(f'Test Result: {output["test_result"]["result"]}') - logger.info( - f'Finished evaluation for instance {output["instance_id"]}: {output["test_result"]["result"]}' - ) - output_fp.write(json.dumps(output) + '\n') - output_fp.flush() - - # This sets the multi-processing - num_workers = args.eval_num_workers - logger.info(f'Using {num_workers} workers for evaluation.') - - # This is SWE-Bench specific - CodeActAgent doesn't require mounted workspace to work - skip_workspace_mount = agent_class == 'CodeActAgent' - logger.info(f'Skipping workspace mount: {skip_workspace_mount}') - - try: - with ProcessPoolExecutor(num_workers) as executor: - futures = [] - # This is how we perform multi-processing - for row_idx, instance in swe_bench_tests.iterrows(): - future = executor.submit( - process_instance, - agent_class, - config.llm, - instance, - metadata, - skip_workspace_mount, - eval_output_dir, - reset_logger=bool(num_workers > 1), - ) - future.add_done_callback(update_progress) - futures.append(future) - - # Wait for all futures to complete - for future in futures: - future.result() - except KeyboardInterrupt: - print('KeyboardInterrupt received. Cleaning up...') - cleanup() - - output_fp.close() - logger.info('Evaluation finished.') diff --git a/evaluation/toolqa/run_infer.py b/evaluation/toolqa/run_infer.py index cbb78fd183..a35b48a03a 100644 --- a/evaluation/toolqa/run_infer.py +++ b/evaluation/toolqa/run_infer.py @@ -1,18 +1,22 @@ import asyncio -import json import logging -import multiprocessing as mp import os import pathlib -import subprocess -import time -from concurrent.futures import ProcessPoolExecutor +from typing import Any -from tqdm import tqdm +import pandas as pd +from evaluation.utils.shared import ( + EvalMetadata, + codeact_user_response, + make_metadata, + monologue_user_response, + prepare_dataset, + run_evaluation, +) from opendevin.controller.agent import Agent from opendevin.controller.state.state import State -from opendevin.core.config import config, get_llm_config_arg, get_parser +from opendevin.core.config import LLMConfig, config, get_llm_config_arg, get_parser from opendevin.core.logger import get_console_handler from opendevin.core.logger import opendevin_logger as logger from opendevin.core.main import run_agent_controller @@ -22,40 +26,6 @@ from opendevin.llm.llm import LLM from .utils import download_data, download_tools, encode_question, eval_answer, get_data - -def cleanup(): - print('Cleaning up child processes...') - for process in mp.active_children(): - print(f'Terminating child process: {process.name}') - process.terminate() - process.join() - - -def codeact_user_response(state: State) -> str: - msg = ( - 'Please continue working on the task on whatever approach you think is suitable.\n' - 'When you think you finished the task, respond with `Finish[answer]` where you include your answer in `[]`\n' - 'IMPORTANT: YOU SHOULD NEVER ASK FOR HUMAN HELP OR USE THE INTERNET TO SOLVE THIS TASK.\n' - ) - if state.history: - user_msgs = [ - action - for action, _ in state.history - if isinstance(action, MessageAction) and action.source == 'user' - ] - if len(user_msgs) >= 2: - # let the agent know that it can give up when it has tried 3 times - return ( - msg - + 'If you want to give up, run: exit .\n' - ) - return msg - - -def monologue_user_response(state: State) -> str: - raise NotImplementedError('MonologueAgent should never ask for user responses.') - - AGENT_CLS_TO_FAKE_USER_RESPONSE_FN = { 'CodeActAgent': codeact_user_response, 'MonologueAgent': monologue_user_response, @@ -66,7 +36,9 @@ AGENT_CLS_TO_INST_SUFFIX = { } -def process_instance(agent: Agent, task, metadata, reset_logger: bool = True): +def process_instance( + agent: Agent, instance: Any, metadata: EvalMetadata, reset_logger: bool = True +): # create process-specific workspace dir # we will create a workspace directory for EACH process # so that different agent don't interfere with each other. @@ -74,10 +46,10 @@ def process_instance(agent: Agent, task, metadata, reset_logger: bool = True): pathlib.Path(workspace_mount_path).mkdir(parents=True, exist_ok=True) # Setup the logger properly, so you can run multi-processing to parallelize the evaluation - eval_output_dir = metadata['eval_output_dir'] - qid = task['qid'] - question = task['question'] - answer = task['answer'] + eval_output_dir = metadata.eval_output_dir + qid = instance.qid + question = instance.question + answer = instance.answer if reset_logger: # Set up logger log_file = os.path.join(eval_output_dir, 'logs', f'instance_{qid}.log') @@ -139,7 +111,7 @@ def process_instance(agent: Agent, task, metadata, reset_logger: bool = True): 'text': model_answer_raw, 'correct': correct, 'answer_id': 'None', - 'model_id': metadata['model_name'], + 'model_id': metadata.model_name, 'metadata': metadata, 'history': [ (event_to_dict(action), event_to_dict(obs)) for action, obs in state.history @@ -171,36 +143,6 @@ if __name__ == '__main__': default='YOUR_WOLFRAMALPHA_APPID', ) args, _ = parser.parse_known_args() - if args.directory: - config.workspace_base = os.path.abspath(args.directory) - print(f'Setting workspace base to {config.workspace_base}') - # Check https://github.com/OpenDevin/OpenDevin/blob/main/evaluation/swe_bench/README.md#configure-opendevin-and-your-llm - # for details of how to set `llm_config` - if args.llm_config: - specified_llm_config = get_llm_config_arg(args.llm_config) - if specified_llm_config: - config.llm = specified_llm_config - logger.info(f'Config for evaluation: {config}') - agent_class = args.agent_cls - assert ( - agent_class in AGENT_CLS_TO_FAKE_USER_RESPONSE_FN - ), f'Unsupported agent class: {agent_class}' - model_name = config.llm.model.split('/')[-1] - max_iterations = args.max_iterations - eval_note = '' - if args.eval_note is not None: - eval_note += '_N_' + args.eval_note - eval_output_dir = os.path.join( - args.eval_output_dir, - 'toolqa', - agent_class, - model_name + '_maxiter_' + str(max_iterations) + eval_note, - ) - pathlib.Path(eval_output_dir).mkdir(parents=True, exist_ok=True) - pathlib.Path(os.path.join(eval_output_dir, 'logs')).mkdir( - parents=True, exist_ok=True - ) - logger.info(f'Using evaluation output directory: {eval_output_dir}') dataset = '' hardness = '' @@ -215,149 +157,39 @@ if __name__ == '__main__': 'yelp', 'genda', ] - if args.dataset in dataset_choices: - dataset = args.dataset - else: + if args.dataset not in dataset_choices: raise ValueError( 'Please choose from agenda, airbnb, coffee, dblp, flight, gsm8k, scirex, yelp for dataset.' ) - if args.hardness == 'easy': - hardness = 'easy' - elif args.hardness == 'hard': - hardness = 'hard' - else: + if args.hardness not in ['easy', 'hard']: raise ValueError('Please choose from easy and hard for hardness.') - logger.info(f'Evaluating ToolQA {dataset} {hardness} test') # workspace_mount_path = os.path.join(config.workspace_mount_path, '_eval_workspace') workspace_mount_path = config.workspace_mount_path pathlib.Path(workspace_mount_path).mkdir(parents=True, exist_ok=True) - toolqa_test = get_data(dataset, hardness) + toolqa_test = pd.DataFrame(get_data(dataset, hardness)) toolqa_data_path = download_data(workspace_mount_path) toolqa_tool_path = download_tools(workspace_mount_path, args.wolfram_alpha_appid) - # TEST METADATA - metadata = { - 'dataset': dataset, - 'hardness': hardness, - 'agent_class': agent_class, - 'model_name': model_name, - 'max_iterations': max_iterations, - 'eval_output_dir': eval_output_dir, - 'start_time': time.strftime('%Y-%m-%d %H:%M:%S'), - # get the commit id of current repo for reproduciblity - 'git_commit': subprocess.check_output(['git', 'rev-parse', 'HEAD']) - .decode('utf-8') - .strip(), - } - logger.info(f'Metadata: {metadata}') - with open( - os.path.join(eval_output_dir, f'metadata_{dataset}_{hardness}.json'), 'w' - ) as f: - json.dump(metadata, f) - # LIMIT EVALUATION - eval_n_limit = args.eval_n_limit - if eval_n_limit: - toolqa_test = toolqa_test[:eval_n_limit] - logger.info( - f'Limiting evaluation to a total of first {eval_n_limit} instances.' - ) - output_file = os.path.join( - eval_output_dir, f'output_{model_name}_{dataset}_{hardness}.jsonl' + id_column = 'qid' + llm_config = get_llm_config_arg(args.llm_config) if args.llm_config else LLMConfig() + metadata = make_metadata( + llm_config, + f'toolqa-{args.dataset}-{args.hardness}', + args.agent_cls, + args.max_iterations, + args.eval_note, + args.eval_output_dir, ) - logger.info(f'Writing evaluation output to {output_file}') - finished_task_ids = set() - if os.path.exists(output_file): - with open(output_file, 'r') as f: - for line in f: - task = json.loads(line) - finished_task_ids.add(task['qid']) - logger.warning( - f'Output file {output_file} already exists. Loaded {len(finished_task_ids)} finished instances.' - ) - output_fp = open(output_file, 'a') - logger.info( - f'Evaluation started with Agent {agent_class}, model {model_name}, max iterations {max_iterations}.' - ) - # ============================================= - # filter out finished instances - new_toolqa_test = [] - for task in toolqa_test: - qid = task['qid'] - if qid in finished_task_ids: - logger.info(f'Skipping instance {qid} as it is already finished.') - continue - new_toolqa_test.append(task) - finished_task_number = len(finished_task_ids) - toolqa_test = new_toolqa_test - logger.info( - f'Finished instances: {finished_task_number}, Remaining instances: {len(toolqa_test)}' - ) - - # ============================================= - pbar = tqdm(total=len(toolqa_test)) - - # This function tracks the progress AND write the output to a JSONL file - def update_progress(future): - pbar.update(1) - output = future.result() - pbar.set_description(f'Instance {output["qid"]}') - pbar.set_postfix_str(f'Test Result: {output["correct"]}') - logger.info( - f'Finished evaluation for instance {output["qid"]}: {output["correct"]}' - ) - output_fp.write(json.dumps(output) + '\n') - output_fp.flush() - finished_task_ids.add(output['qid']) - - # Create the agent - agent = Agent.get_cls(agent_class)(llm=LLM(config.llm)) - - # This sets the multi-processing - num_workers = args.eval_num_workers - logger.info(f'Using {num_workers} workers for evaluation.') - try: - with ProcessPoolExecutor(num_workers) as executor: - futures = [] - # This is how we perform multi-processing - for task in toolqa_test: - try: - future = executor.submit( - process_instance, - agent, - task, - metadata, - reset_logger=bool(num_workers > 1), - ) - future.add_done_callback(update_progress) - futures.append(future) - except Exception: - continue - # Wait for all futures to complete - for future in futures: - try: - future.result() - except Exception: - continue - except KeyboardInterrupt: - logger.info('KeyboardInterrupt received. Cleaning up...') - cleanup() - output_fp.close() - total_correct = 0 - output = [] - with open(output_file, 'r') as f: - for line in f: - data = json.loads(line) - output.append(data) - if data['qid'] in finished_task_ids: - if str(data['correct']).lower() == 'true': - total_correct += 1 - # sort all output by question_id - output = sorted(output, key=lambda x: x['qid']) - with open(output_file, 'w') as f: - for dat in output: - f.write(json.dumps(dat) + '\n') - f.flush() - logger.info( - f'Evaluation finished for {dataset}-{hardness}. Total: {len(toolqa_test)+finished_task_number}; Correct: {total_correct}; Accuracy: {total_correct / (len(toolqa_test)+finished_task_number)}' + output_file = os.path.join(metadata.eval_output_dir, 'output.jsonl') + instances = prepare_dataset(toolqa_test, output_file, args.eval_n_limit, id_column) + agent = Agent.get_cls(metadata.agent_class)(llm=LLM(llm_config)) + run_evaluation( + agent, + instances, + metadata, + output_file, + args.eval_num_workers, + process_instance, + id_column, ) diff --git a/evaluation/utils/shared.py b/evaluation/utils/shared.py new file mode 100644 index 0000000000..42413d14ff --- /dev/null +++ b/evaluation/utils/shared.py @@ -0,0 +1,208 @@ +import json +import multiprocessing as mp +import os +import pathlib +import subprocess +import time +from asyncio.log import logger +from concurrent.futures import ProcessPoolExecutor +from typing import Any, Callable + +import pandas as pd +from pydantic import BaseModel +from tqdm import tqdm + +from opendevin.controller.agent import Agent +from opendevin.controller.state.state import State +from opendevin.core.config import LLMConfig +from opendevin.events.action import Action +from opendevin.events.action.message import MessageAction +from opendevin.llm.llm import LLM + + +class EvalMetadata(BaseModel): + agent_class: str + llm_config: LLMConfig + max_iterations: int + eval_output_dir: str + start_time: str + git_commit: str + dataset: str | None = None + data_split: str | None = None + details: dict[str, Any] | None = None + + +def codeact_user_response( + state: State, + encapsulate_solution: bool = False, + try_parse: Callable[[Action], str] | None = None, +) -> str: + encaps_str = ( + ( + 'Please encapsulate your final answer (answer ONLY) within and .\n' + 'For example: The answer to the question is 42 .\n' + ) + if encapsulate_solution + else '' + ) + msg = ( + 'Please continue working on the task on whatever approach you think is suitable.\n' + 'If you think you have solved the task, please first send your answer to user through message and then exit .\n' + f'{encaps_str}' + 'IMPORTANT: YOU SHOULD NEVER ASK FOR HUMAN HELP.\n' + ) + if state.history: + if try_parse is not None: + last_action, _ = state.history[-1] + ans = try_parse(last_action) + if ans is not None: + return '/exit' + user_msgs = [ + action + for action, _ in state.history + if isinstance(action, MessageAction) and action.source == 'user' + ] + if len(user_msgs) >= 2: + # let the agent know that it can give up when it has tried 3 times + return ( + msg + + 'If you want to give up, run: exit .\n' + ) + return msg + + +def monologue_user_response(state: State) -> str: + raise NotImplementedError('MonologueAgent should never ask for user responses.') + + +def cleanup(): + print('Cleaning up child processes...') + for process in mp.active_children(): + print(f'Terminating child process: {process.name}') + process.terminate() + process.join() + + +def make_metadata( + llm_config: LLMConfig, + dataset_name: str, + agent_class: str, + max_iterations: int, + eval_note: str | None, + eval_output_dir: str, + data_split: str | None = None, + details: dict[str, Any] | None = None, +) -> EvalMetadata: + model_name = llm_config.model.split('/')[-1] + eval_note = f'_N_{eval_note}' if eval_note else '' + + eval_output_path = os.path.join( + eval_output_dir, + dataset_name, + agent_class, + f'{model_name}_maxiter_{max_iterations}{eval_note}', + ) + + pathlib.Path(eval_output_path).mkdir(parents=True, exist_ok=True) + pathlib.Path(os.path.join(eval_output_path, 'logs')).mkdir( + parents=True, exist_ok=True + ) + logger.info(f'Using evaluation output directory: {eval_output_path}') + + metadata = EvalMetadata( + agent_class=agent_class, + llm_config=llm_config, + max_iterations=max_iterations, + eval_output_dir=eval_output_path, + start_time=time.strftime('%Y-%m-%d %H:%M:%S'), + git_commit=subprocess.check_output(['git', 'rev-parse', 'HEAD']) + .decode('utf-8') + .strip(), + dataset=dataset_name, + data_split=data_split, + details=details, + ) + metadata_json = metadata.model_dump_json() + logger.info(f'Metadata: {metadata_json}') + with open(os.path.join(eval_output_path, 'metadata.json'), 'w') as f: + f.write(metadata_json) + + return metadata + + +def prepare_dataset(dataset: pd.DataFrame, output_file, eval_n_limit, id_column): + logger.info(f'Writing evaluation output to {output_file}') + finished_ids = set() + if os.path.exists(output_file): + with open(output_file, 'r') as f: + for line in f: + data = json.loads(line) + finished_ids.add(data[id_column]) + logger.warning( + f'Output file {output_file} already exists. Loaded {len(finished_ids)} finished instances.' + ) + + if eval_n_limit: + dataset = dataset.head(eval_n_limit) + logger.info(f'Limiting evaluation to first {eval_n_limit} instances.') + + new_dataset = [ + instance + for _, instance in dataset.iterrows() + if instance[id_column] not in finished_ids + ] + logger.info( + f'Finished instances: {len(finished_ids)}, Remaining instances: {len(new_dataset)}' + ) + + return pd.DataFrame(new_dataset) + + +def run_evaluation( + dataset: pd.DataFrame, + metadata: EvalMetadata, + output_file: str, + num_workers: int, + process_instance_func: Callable[[pd.Series, EvalMetadata, bool], Any], + id_column: str, +): + agent = Agent.get_cls(metadata.agent_class)(llm=LLM(metadata.llm_config)) + logger.info( + f'Evaluation started with Agent {agent.__class__.name}, ' + f'model {agent.llm.model_name}, max iterations {metadata.max_iterations}.' + ) + pbar = tqdm(total=len(dataset)) + output_fp = open(output_file, 'a') + + def update_progress(future): + pbar.update(1) + output = future.result() + pbar.set_description(f'Instance {output[id_column]}') + pbar.set_postfix_str(f'Test Result: {output["test_result"]["result"]}') + logger.info( + f'Finished evaluation for instance {output[id_column]}: {output["test_result"]["result"]}' + ) + output_fp.write(json.dumps(output) + '\n') + output_fp.flush() + + try: + with ProcessPoolExecutor(num_workers) as executor: + futures = [] + for _, instance in dataset.iterrows(): + future = executor.submit( + process_instance_func, + instance, + metadata, + bool(num_workers > 1), + ) + future.add_done_callback(update_progress) + futures.append(future) + + for future in futures: + future.result() + except KeyboardInterrupt: + print('KeyboardInterrupt received. Cleaning up...') + cleanup() + + output_fp.close() + logger.info('Evaluation finished.') diff --git a/evaluation/webarena/run_infer.py b/evaluation/webarena/run_infer.py index 32112caf26..90e5c187dc 100644 --- a/evaluation/webarena/run_infer.py +++ b/evaluation/webarena/run_infer.py @@ -2,17 +2,20 @@ import asyncio import json import logging import os -import pathlib -import subprocess -import time import browsergym.webarena # noqa F401 register webarena tasks as gym environments import gymnasium as gym -from tqdm import tqdm +import pandas as pd +from evaluation.utils.shared import ( + EvalMetadata, + make_metadata, + prepare_dataset, + run_evaluation, +) from opendevin.controller.agent import Agent from opendevin.controller.state.state import State -from opendevin.core.config import config, get_llm_config_arg, parse_arguments +from opendevin.core.config import LLMConfig, get_llm_config_arg, parse_arguments from opendevin.core.logger import get_console_handler from opendevin.core.logger import opendevin_logger as logger from opendevin.core.main import run_agent_controller @@ -24,18 +27,30 @@ from opendevin.runtime.tools import RuntimeTool SUPPORTED_AGENT_CLS = {'BrowsingAgent'} +docker_ssh_box: DockerSSHBox | None = None + + +def get_sandbox(): + global docker_ssh_box + if docker_ssh_box is None: + docker_ssh_box = DockerSSHBox() + return docker_ssh_box + + def process_instance( - agent: Agent, - env_id: str, - metadata: dict, - eval_output_dir: str, - docker_sandbox: DockerSSHBox, + instance: pd.Series, + metadata: EvalMetadata, reset_logger: bool = True, ): + # Create the agent + agent = Agent.get_cls(metadata.agent_class)(llm=LLM(llm_config=metadata.llm_config)) + env_id = instance.id # Setup the logger properly, so you can run multi-processing to parallelize the evaluation if reset_logger: # Set up logger - log_file = os.path.join(eval_output_dir, 'logs', f'instance_{env_id}.log') + log_file = os.path.join( + metadata.eval_output_dir, 'logs', f'instance_{env_id}.log' + ) # Remove all existing handlers from logger for handler in logger.handlers[:]: logger.removeHandler(handler) @@ -59,7 +74,7 @@ def process_instance( runtime_tools_config = { RuntimeTool.BROWSER: { 'browsergym_eval': env_id, - 'browsergym_eval_save_dir': eval_output_dir, + 'browsergym_eval_save_dir': metadata.eval_output_dir, } } @@ -68,7 +83,7 @@ def process_instance( agent, 'PLACEHOLDER_GOAL', runtime_tools_config=runtime_tools_config, - sandbox=docker_sandbox, + sandbox=get_sandbox(), sid=env_id, ) ) @@ -82,7 +97,7 @@ def process_instance( raise ValueError('State should not be None.') metrics = state.metrics.get() if state.metrics else None - browsergym_eval_dir = os.path.join(eval_output_dir, env_id.split('/')[1]) + browsergym_eval_dir = os.path.join(metadata.eval_output_dir, env_id.split('/')[1]) # read goal with open( os.path.join(browsergym_eval_dir, 'goal.txt'), 'r', encoding='utf-8' @@ -118,108 +133,34 @@ if __name__ == '__main__': id for id in gym.envs.registry.keys() if id.startswith('browsergym/webarena') ] - # Check https://github.com/OpenDevin/OpenDevin/blob/main/evaluation/swe_bench/README.md#configure-opendevin-and-your-llm - # for details of how to set `llm_config` - if args.llm_config: - specified_llm_config = get_llm_config_arg(args.llm_config) - if specified_llm_config: - config.llm = specified_llm_config - logger.info(f'Config for evaluation: {config}') + dataset = pd.DataFrame( + { + 'id': [ + id + for id in gym.envs.registry.keys() + if id.startswith('browsergym/miniwob') + ] + } + ) - # TEST METADATA - agent_class = args.agent_cls - assert agent_class in SUPPORTED_AGENT_CLS, f'Unsupported agent class: {agent_class}' - model_name = config.llm.model.split('/')[-1] - max_iterations = args.max_iterations - eval_note = '' - if args.eval_note is not None: - eval_note += '_N_' + args.eval_note - eval_output_dir = os.path.join( + id_column = 'id' + llm_config = get_llm_config_arg(args.llm_config) if args.llm_config else LLMConfig() + metadata = make_metadata( + llm_config, + args.dataset_name, + args.agent_cls, + args.max_iterations, + args.eval_note, args.eval_output_dir, - 'webarena', - agent_class, - model_name + '_maxiter_' + str(max_iterations) + eval_note, ) - - pathlib.Path(eval_output_dir).mkdir(parents=True, exist_ok=True) - pathlib.Path(os.path.join(eval_output_dir, 'logs')).mkdir( - parents=True, exist_ok=True + output_file = os.path.join(metadata.eval_output_dir, 'output.jsonl') + instances = prepare_dataset(dataset, output_file, args.eval_n_limit, id_column) + _ = get_sandbox() # Initialize the sandbox + run_evaluation( + instances, + metadata, + output_file, + args.eval_num_workers, + process_instance, + id_column, ) - logger.info(f'Using evaluation output directory: {eval_output_dir}') - - metadata = { - 'agent_class': agent_class, - 'model_name': model_name, - 'max_iterations': max_iterations, - 'eval_output_dir': eval_output_dir, - 'start_time': time.strftime('%Y-%m-%d %H:%M:%S'), - # get the commit id of current repo for reproducibility - 'git_commit': subprocess.check_output(['git', 'rev-parse', 'HEAD']) - .decode('utf-8') - .strip(), - } - logger.info(f'Metadata: {metadata}') - with open(os.path.join(eval_output_dir, 'metadata.json'), 'w') as f: - json.dump(metadata, f) - - # LIMIT EVALUATION - eval_n_limit = args.eval_n_limit - if eval_n_limit: - env_ids = env_ids[:eval_n_limit] - logger.info(f'Limiting evaluation to first {eval_n_limit} instances.') - - # OUTPUT FILE - output_file = os.path.join(eval_output_dir, 'output.jsonl') - logger.info(f'Writing evaluation output to {output_file}') - finished_instance_ids = set() - if os.path.exists(output_file): - with open(output_file, 'r') as f: - for line in f: - data = json.loads(line) - finished_instance_ids.add(data['instance_id']) - logger.warning( - f'Output file {output_file} already exists. Loaded {len(finished_instance_ids)} finished instances.' - ) - output_fp = open(output_file, 'a') - - logger.info( - f'Evaluation started with Agent {agent_class}, model {model_name}, max iterations {max_iterations}.' - ) - - # ============================================= - # filter out finished instances - new_env_ids = [] - for idx in env_ids: - if idx in finished_instance_ids: - logger.info(f'Skipping instance {idx} as it is already finished.') - continue - new_env_ids.append(idx) - - env_ids = new_env_ids - logger.info( - f'Finished instances: {len(finished_instance_ids)}, Remaining instances: {len(env_ids)}' - ) - - # ============================================= - - # Create the agent - agent = Agent.get_cls(agent_class)(llm=LLM(config.llm)) - - docker_sandbox = DockerSSHBox() - for env_id in tqdm(env_ids): - try: - output = process_instance( - agent=agent, - env_id=env_id, - metadata=metadata, - eval_output_dir=eval_output_dir, - docker_sandbox=docker_sandbox, - reset_logger=False, - ) - output_fp.write(json.dumps(output) + '\n') - output_fp.flush() - except Exception as e: - logger.error(f'Error processing instance {env_id}: {e}') - - output_fp.close() - logger.info('Evaluation finished.')