Replace file editor with openhands-aci (#4782)

This commit is contained in:
Ryan H. Tran
2024-11-12 20:26:33 +07:00
committed by GitHub
parent 32fdcd58e5
commit d9c5f11046
7 changed files with 41 additions and 648 deletions

View File

@@ -1,60 +1,8 @@
"""This file contains a global singleton of the `EditTool` class as well as raw functions that expose its __call__."""
from .base import CLIResult, ToolError, ToolResult
from .impl import Command, EditTool
_GLOBAL_EDITOR = EditTool()
def _make_api_tool_result(
result: ToolResult,
) -> str:
"""Convert an agent ToolResult to an API ToolResultBlockParam."""
tool_result_content: str = ''
is_error = False
if result.error:
is_error = True
tool_result_content = _maybe_prepend_system_tool_result(result, result.error)
else:
assert result.output, 'Expecting output in file_editor'
tool_result_content = _maybe_prepend_system_tool_result(result, result.output)
assert (
not result.base64_image
), 'Not expecting base64_image as output in file_editor'
if is_error:
return f'ERROR:\n{tool_result_content}'
else:
return tool_result_content
def _maybe_prepend_system_tool_result(result: ToolResult, result_text: str) -> str:
if result.system:
result_text = f'<system>{result.system}</system>\n{result_text}'
return result_text
def file_editor(
command: Command,
path: str,
file_text: str | None = None,
view_range: list[int] | None = None,
old_str: str | None = None,
new_str: str | None = None,
insert_line: int | None = None,
) -> str:
try:
result: CLIResult = _GLOBAL_EDITOR(
command=command,
path=path,
file_text=file_text,
view_range=view_range,
old_str=old_str,
new_str=new_str,
insert_line=insert_line,
)
except ToolError as e:
return _make_api_tool_result(ToolResult(error=e.message))
return _make_api_tool_result(result)
"""This file imports a global singleton of the `EditTool` class as well as raw functions that expose
its __call__.
The implementation of the `EditTool` class can be found at: https://github.com/All-Hands-AI/openhands-aci/.
"""
from openhands_aci.editor import file_editor
__all__ = ['file_editor']

View File

@@ -1,50 +0,0 @@
from dataclasses import dataclass, fields, replace
@dataclass(kw_only=True, frozen=True)
class ToolResult:
"""Represents the result of a tool execution."""
output: str | None = None
error: str | None = None
base64_image: str | None = None
system: str | None = None
def __bool__(self):
return any(getattr(self, field.name) for field in fields(self))
def __add__(self, other: 'ToolResult'):
def combine_fields(
field: str | None, other_field: str | None, concatenate: bool = True
):
if field and other_field:
if concatenate:
return field + other_field
raise ValueError('Cannot combine tool results')
return field or other_field
return ToolResult(
output=combine_fields(self.output, other.output),
error=combine_fields(self.error, other.error),
base64_image=combine_fields(self.base64_image, other.base64_image, False),
system=combine_fields(self.system, other.system),
)
def replace(self, **kwargs):
"""Returns a new ToolResult with the given fields replaced."""
return replace(self, **kwargs)
class CLIResult(ToolResult):
"""A ToolResult that can be rendered as a CLI output."""
class ToolFailure(ToolResult):
"""A ToolResult that represents a failure."""
class ToolError(Exception):
"""Raised when a tool encounters an error."""
def __init__(self, message):
self.message = message

View File

@@ -1,279 +0,0 @@
from collections import defaultdict
from pathlib import Path
from typing import Literal, get_args
from .base import CLIResult, ToolError, ToolResult
from .run import maybe_truncate, run
Command = Literal[
'view',
'create',
'str_replace',
'insert',
'undo_edit',
]
SNIPPET_LINES: int = 4
class EditTool:
"""
An filesystem editor tool that allows the agent to view, create, and edit files.
The tool parameters are defined by Anthropic and are not editable.
Original implementation: https://github.com/anthropics/anthropic-quickstarts/blob/main/computer-use-demo/computer_use_demo/tools/edit.py
"""
_file_history: dict[Path, list[str]]
def __init__(self):
self._file_history = defaultdict(list)
super().__init__()
def __call__(
self,
*,
command: Command,
path: str,
file_text: str | None = None,
view_range: list[int] | None = None,
old_str: str | None = None,
new_str: str | None = None,
insert_line: int | None = None,
**kwargs,
):
_path = Path(path)
self.validate_path(command, _path)
if command == 'view':
return self.view(_path, view_range)
elif command == 'create':
if file_text is None:
raise ToolError('Parameter `file_text` is required for command: create')
self.write_file(_path, file_text)
self._file_history[_path].append(file_text)
return ToolResult(output=f'File created successfully at: {_path}')
elif command == 'str_replace':
if old_str is None:
raise ToolError(
'Parameter `old_str` is required for command: str_replace'
)
return self.str_replace(_path, old_str, new_str)
elif command == 'insert':
if insert_line is None:
raise ToolError(
'Parameter `insert_line` is required for command: insert'
)
if new_str is None:
raise ToolError('Parameter `new_str` is required for command: insert')
return self.insert(_path, insert_line, new_str)
elif command == 'undo_edit':
return self.undo_edit(_path)
raise ToolError(
f'Unrecognized command {command}. The allowed commands for the {self.name} tool are: {", ".join(get_args(Command))}'
)
def validate_path(self, command: str, path: Path):
"""
Check that the path/command combination is valid.
"""
# Check if its an absolute path
if not path.is_absolute():
suggested_path = Path('') / path
raise ToolError(
f'The path {path} is not an absolute path, it should start with `/`. Maybe you meant {suggested_path}?'
)
# Check if path exists
if not path.exists() and command != 'create':
raise ToolError(
f'The path {path} does not exist. Please provide a valid path.'
)
if path.exists() and command == 'create':
raise ToolError(
f'File already exists at: {path}. Cannot overwrite files using command `create`.'
)
# Check if the path points to a directory
if path.is_dir():
if command != 'view':
raise ToolError(
f'The path {path} is a directory and only the `view` command can be used on directories'
)
def view(self, path: Path, view_range: list[int] | None = None):
"""Implement the view command"""
if path.is_dir():
if view_range:
raise ToolError(
'The `view_range` parameter is not allowed when `path` points to a directory.'
)
_, stdout, stderr = run(rf"find {path} -maxdepth 2 -not -path '*/\.*'")
if not stderr:
stdout = f"Here's the files and directories up to 2 levels deep in {path}, excluding hidden items:\n{stdout}\n"
return CLIResult(output=stdout, error=stderr)
file_content = self.read_file(path)
init_line = 1
if view_range:
if len(view_range) != 2 or not all(isinstance(i, int) for i in view_range):
raise ToolError(
'Invalid `view_range`. It should be a list of two integers.'
)
file_lines = file_content.split('\n')
n_lines_file = len(file_lines)
init_line, final_line = view_range
if init_line < 1 or init_line > n_lines_file:
raise ToolError(
f"Invalid `view_range`: {view_range}. It's first element `{init_line}` should be within the range of lines of the file: {[1, n_lines_file]}"
)
if final_line > n_lines_file:
raise ToolError(
f"Invalid `view_range`: {view_range}. It's second element `{final_line}` should be smaller than the number of lines in the file: `{n_lines_file}`"
)
if final_line != -1 and final_line < init_line:
raise ToolError(
f"Invalid `view_range`: {view_range}. It's second element `{final_line}` should be larger or equal than its first `{init_line}`"
)
if final_line == -1:
file_content = '\n'.join(file_lines[init_line - 1 :])
else:
file_content = '\n'.join(file_lines[init_line - 1 : final_line])
return CLIResult(
output=self._make_output(file_content, str(path), init_line=init_line)
)
def str_replace(self, path: Path, old_str: str, new_str: str | None):
"""Implement the str_replace command, which replaces old_str with new_str in the file content"""
# Read the file content
file_content = self.read_file(path).expandtabs()
old_str = old_str.expandtabs()
new_str = new_str.expandtabs() if new_str is not None else ''
# Check if old_str is unique in the file
occurrences = file_content.count(old_str)
if occurrences == 0:
raise ToolError(
f'No replacement was performed, old_str `{old_str}` did not appear verbatim in {path}.'
)
elif occurrences > 1:
file_content_lines = file_content.split('\n')
lines = [
idx + 1
for idx, line in enumerate(file_content_lines)
if old_str in line
]
raise ToolError(
f'No replacement was performed. Multiple occurrences of old_str `{old_str}` in lines {lines}. Please ensure it is unique'
)
# Replace old_str with new_str
new_file_content = file_content.replace(old_str, new_str)
# Write the new content to the file
self.write_file(path, new_file_content)
# Save the content to history
self._file_history[path].append(file_content)
# Create a snippet of the edited section
replacement_line = file_content.split(old_str)[0].count('\n')
start_line = max(0, replacement_line - SNIPPET_LINES)
end_line = replacement_line + SNIPPET_LINES + new_str.count('\n')
snippet = '\n'.join(new_file_content.split('\n')[start_line : end_line + 1])
# Prepare the success message
success_msg = f'The file {path} has been edited. '
success_msg += self._make_output(
snippet, f'a snippet of {path}', start_line + 1
)
success_msg += 'Review the changes and make sure they are as expected. Edit the file again if necessary.'
return CLIResult(output=success_msg)
def insert(self, path: Path, insert_line: int, new_str: str):
"""Implement the insert command, which inserts new_str at the specified line in the file content."""
file_text = self.read_file(path).expandtabs()
new_str = new_str.expandtabs()
file_text_lines = file_text.split('\n')
n_lines_file = len(file_text_lines)
if insert_line < 0 or insert_line > n_lines_file:
raise ToolError(
f'Invalid `insert_line` parameter: {insert_line}. It should be within the range of lines of the file: {[0, n_lines_file]}'
)
new_str_lines = new_str.split('\n')
new_file_text_lines = (
file_text_lines[:insert_line]
+ new_str_lines
+ file_text_lines[insert_line:]
)
snippet_lines = (
file_text_lines[max(0, insert_line - SNIPPET_LINES) : insert_line]
+ new_str_lines
+ file_text_lines[insert_line : insert_line + SNIPPET_LINES]
)
new_file_text = '\n'.join(new_file_text_lines)
snippet = '\n'.join(snippet_lines)
self.write_file(path, new_file_text)
self._file_history[path].append(file_text)
success_msg = f'The file {path} has been edited. '
success_msg += self._make_output(
snippet,
'a snippet of the edited file',
max(1, insert_line - SNIPPET_LINES + 1),
)
success_msg += 'Review the changes and make sure they are as expected (correct indentation, no duplicate lines, etc). Edit the file again if necessary.'
return CLIResult(output=success_msg)
def undo_edit(self, path: Path):
"""Implement the undo_edit command."""
if not self._file_history[path]:
raise ToolError(f'No edit history found for {path}.')
old_text = self._file_history[path].pop()
self.write_file(path, old_text)
return CLIResult(
output=f'Last edit to {path} undone successfully. {self._make_output(old_text, str(path))}'
)
def read_file(self, path: Path):
"""Read the content of a file from a given path; raise a ToolError if an error occurs."""
try:
return path.read_text()
except Exception as e:
raise ToolError(f'Ran into {e} while trying to read {path}') from None
def write_file(self, path: Path, file: str):
"""Write the content of a file to a given path; raise a ToolError if an error occurs."""
try:
path.write_text(file)
except Exception as e:
raise ToolError(f'Ran into {e} while trying to write to {path}') from None
def _make_output(
self,
file_content: str,
file_descriptor: str,
init_line: int = 1,
expand_tabs: bool = True,
):
"""Generate output for the CLI based on the content of a file."""
file_content = maybe_truncate(file_content)
if expand_tabs:
file_content = file_content.expandtabs()
file_content = '\n'.join(
[
f'{i + init_line:6}\t{line}'
for i, line in enumerate(file_content.split('\n'))
]
)
return (
f"Here's the result of running `cat -n` on {file_descriptor}:\n"
+ file_content
+ '\n'
)

View File

@@ -1,44 +0,0 @@
"""Utility to run shell commands asynchronously with a timeout."""
import subprocess
import time
TRUNCATED_MESSAGE: str = '<response clipped><NOTE>To save on context only part of this file has been shown to you. You should retry this tool after you have searched inside the file with `grep -n` in order to find the line numbers of what you are looking for.</NOTE>'
MAX_RESPONSE_LEN: int = 16000
def maybe_truncate(content: str, truncate_after: int | None = MAX_RESPONSE_LEN):
"""Truncate content and append a notice if content exceeds the specified length."""
return (
content
if not truncate_after or len(content) <= truncate_after
else content[:truncate_after] + TRUNCATED_MESSAGE
)
def run(
cmd: str,
timeout: float | None = 120.0, # seconds
truncate_after: int | None = MAX_RESPONSE_LEN,
):
"""Run a shell command synchronously with a timeout."""
start_time = time.time()
try:
process = subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
)
stdout, stderr = process.communicate(timeout=timeout)
return (
process.returncode or 0,
maybe_truncate(stdout, truncate_after=truncate_after),
maybe_truncate(stderr, truncate_after=truncate_after),
)
except subprocess.TimeoutExpired:
process.kill()
elapsed_time = time.time() - start_time
raise TimeoutError(
f"Command '{cmd}' timed out after {elapsed_time:.2f} seconds"
)

35
poetry.lock generated
View File

@@ -1562,6 +1562,17 @@ files = [
{file = "dirtyjson-1.0.8.tar.gz", hash = "sha256:90ca4a18f3ff30ce849d100dcf4a003953c79d3a2348ef056f1d9c22231a25fd"},
]
[[package]]
name = "diskcache"
version = "5.6.3"
description = "Disk Cache -- Disk and file backed persistent cache."
optional = false
python-versions = ">=3"
files = [
{file = "diskcache-5.6.3-py3-none-any.whl", hash = "sha256:5e31b2d5fbad117cc363ebaf6b689474db18a1f6438bc82358b024abd4c2ca19"},
{file = "diskcache-5.6.3.tar.gz", hash = "sha256:2c3a3fa2743d8535d832ec61c2054a1641f41775aa7c556758a109941e33e4fc"},
]
[[package]]
name = "distlib"
version = "0.3.9"
@@ -5629,6 +5640,28 @@ files = [
[package.dependencies]
numpy = {version = ">=1.26.0", markers = "python_version >= \"3.12\""}
[[package]]
name = "openhands-aci"
version = "0.1.0"
description = "An Agent-Computer Interface (ACI) designed for software development agents OpenHands."
optional = false
python-versions = "<4.0,>=3.12"
files = [
{file = "openhands_aci-0.1.0-py3-none-any.whl", hash = "sha256:f28e5a32e394d1e643f79bf8af27fe44d039cb71729d590f9f3ee0c23c075f00"},
{file = "openhands_aci-0.1.0.tar.gz", hash = "sha256:babc55f516efbb27eb7e528662e14b75c902965c48a110408fda824b83ea4461"},
]
[package.dependencies]
diskcache = ">=5.6.3,<6.0.0"
gitpython = "*"
grep-ast = "0.3.3"
litellm = "*"
networkx = "*"
numpy = "*"
pandas = "*"
scipy = "*"
tree-sitter = "0.21.3"
[[package]]
name = "opentelemetry-api"
version = "1.25.0"
@@ -10178,4 +10211,4 @@ testing = ["coverage[toml]", "zope.event", "zope.testing"]
[metadata]
lock-version = "2.0"
python-versions = "^3.12"
content-hash = "245fd4cd56a3c95b2dd4f3a06251f7de82ad0300de7349f0710aac1f92a151b7"
content-hash = "a552f630dfdb9221eda6932e71e67a935c52ebfe4388ec9ef4b3245e7df2f82b"

View File

@@ -63,6 +63,7 @@ opentelemetry-exporter-otlp-proto-grpc = "1.25.0"
modal = "^0.64.145"
runloop-api-client = "0.7.0"
pygithub = "^2.5.0"
openhands-aci = "^0.1.0"
[tool.poetry.group.llama-index.dependencies]
llama-index = "*"

View File

@@ -5,7 +5,6 @@ import sys
import docx
import pytest
from openhands.runtime.plugins.agent_skills.agentskills import file_editor
from openhands.runtime.plugins.agent_skills.file_ops.file_ops import (
WINDOW,
_print_window,
@@ -781,7 +780,7 @@ def test_file_editor_create(tmp_path):
assert result is not None
assert (
result
== f'ERROR:\nThe path {random_file} does not exist. Please provide a valid path.'
== f'ERROR:\nInvalid `path` parameter: {random_file}. The path {random_file} does not exist. Please provide a valid path.'
)
# create a file
@@ -800,218 +799,3 @@ def test_file_editor_create(tmp_path):
1\tLine 6
""".strip().split('\n')
)
@pytest.fixture
def setup_file(tmp_path):
random_dir = tmp_path / 'dir_1'
random_dir.mkdir()
random_file = random_dir / 'a.txt'
return random_file
def test_file_editor_create_and_view(setup_file):
random_file = setup_file
# Test create command
result = file_editor(
command='create', path=str(random_file), file_text='Line 1\nLine 2\nLine 3'
)
print(result)
assert result == f'File created successfully at: {random_file}'
# Test view command for file
result = file_editor(command='view', path=str(random_file))
print(result)
assert (
result.strip().split('\n')
== f"""Here's the result of running `cat -n` on {random_file}:
1\tLine 1
2\tLine 2
3\tLine 3
""".strip().split('\n')
)
# Test view command for directory
result = file_editor(command='view', path=str(random_file.parent))
assert f'{random_file.parent}' in result
assert f'{random_file.name}' in result
def test_file_editor_view_nonexistent(setup_file):
random_file = setup_file
# Test view command for non-existent file
result = file_editor(command='view', path=str(random_file))
assert (
result
== f'ERROR:\nThe path {random_file} does not exist. Please provide a valid path.'
)
def test_file_editor_str_replace(setup_file):
random_file = setup_file
file_editor(
command='create', path=str(random_file), file_text='Line 1\nLine 2\nLine 3'
)
# Test str_replace command
result = file_editor(
command='str_replace',
path=str(random_file),
old_str='Line 2',
new_str='New Line 2',
)
print(result)
assert (
result
== f"""The file {random_file} has been edited. Here's the result of running `cat -n` on a snippet of {random_file}:
1\tLine 1
2\tNew Line 2
3\tLine 3
Review the changes and make sure they are as expected. Edit the file again if necessary."""
)
# View the file after str_replace
result = file_editor(command='view', path=str(random_file))
print(result)
assert (
result.strip().split('\n')
== f"""Here's the result of running `cat -n` on {random_file}:
1\tLine 1
2\tNew Line 2
3\tLine 3
""".strip().split('\n')
)
def test_file_editor_str_replace_non_existent(setup_file):
random_file = setup_file
file_editor(
command='create', path=str(random_file), file_text='Line 1\nLine 2\nLine 3'
)
# Test str_replace with non-existent string
result = file_editor(
command='str_replace',
path=str(random_file),
old_str='Non-existent Line',
new_str='New Line',
)
print(result)
assert (
result
== f'ERROR:\nNo replacement was performed, old_str `Non-existent Line` did not appear verbatim in {random_file}.'
)
def test_file_editor_insert(setup_file):
random_file = setup_file
file_editor(
command='create', path=str(random_file), file_text='Line 1\nLine 2\nLine 3'
)
# Test insert command
result = file_editor(
command='insert', path=str(random_file), insert_line=2, new_str='Inserted Line'
)
print(result)
assert (
result
== f"""The file {random_file} has been edited. Here's the result of running `cat -n` on a snippet of the edited file:
1\tLine 1
2\tLine 2
3\tInserted Line
4\tLine 3
Review the changes and make sure they are as expected (correct indentation, no duplicate lines, etc). Edit the file again if necessary."""
)
# View the file after insert
result = file_editor(command='view', path=str(random_file))
assert (
result.strip().split('\n')
== f"""Here's the result of running `cat -n` on {random_file}:
1\tLine 1
2\tLine 2
3\tInserted Line
4\tLine 3
""".strip().split('\n')
)
def test_file_editor_insert_invalid_line(setup_file):
random_file = setup_file
file_editor(
command='create', path=str(random_file), file_text='Line 1\nLine 2\nLine 3'
)
# Test insert with invalid line number
result = file_editor(
command='insert',
path=str(random_file),
insert_line=10,
new_str='Invalid Insert',
)
assert (
result
== 'ERROR:\nInvalid `insert_line` parameter: 10. It should be within the range of lines of the file: [0, 3]'
)
def test_file_editor_undo_edit(setup_file):
random_file = setup_file
result = file_editor(
command='create', path=str(random_file), file_text='Line 1\nLine 2\nLine 3'
)
print(result)
assert result == f"""File created successfully at: {random_file}"""
# Make an edit
result = file_editor(
command='str_replace',
path=str(random_file),
old_str='Line 2',
new_str='New Line 2',
)
print(result)
assert (
result
== f"""The file {random_file} has been edited. Here's the result of running `cat -n` on a snippet of {random_file}:
1\tLine 1
2\tNew Line 2
3\tLine 3
Review the changes and make sure they are as expected. Edit the file again if necessary."""
)
# Test undo_edit command
result = file_editor(command='undo_edit', path=str(random_file))
print(result)
assert (
result
== f"""Last edit to {random_file} undone successfully. Here's the result of running `cat -n` on {random_file}:
1\tLine 1
2\tLine 2
3\tLine 3
"""
)
# View the file after undo_edit
result = file_editor(command='view', path=str(random_file))
assert (
result.strip().split('\n')
== f"""Here's the result of running `cat -n` on {random_file}:
1\tLine 1
2\tLine 2
3\tLine 3
""".strip().split('\n')
)
def test_file_editor_undo_edit_no_edits(tmp_path):
random_file = tmp_path / 'a.txt'
random_file.touch()
# Test undo_edit when no edits have been made
result = file_editor(command='undo_edit', path=str(random_file))
print(result)
assert result == f'ERROR:\nNo edit history found for {random_file}.'