Move linter and diff utils to openhands-aci (#5020)

This commit is contained in:
Ryan H. Tran
2024-11-16 12:58:26 +07:00
committed by GitHub
parent 9d47ddba38
commit 97f3249205
17 changed files with 18 additions and 1251 deletions

View File

@@ -1,9 +1,11 @@
"""Linter module for OpenHands.
Part of this Linter module is adapted from Aider (Apache 2.0 License, [original code](https://github.com/paul-gauthier/aider/blob/main/aider/linter.py)). Please see the [original repository](https://github.com/paul-gauthier/aider) for more information.
Part of this Linter module is adapted from Aider (Apache 2.0 License, [original
code](https://github.com/paul-gauthier/aider/blob/main/aider/linter.py)).
- Please see the [original repository](https://github.com/paul-gauthier/aider) for more information.
- The detailed implementation of the linter can be found at: https://github.com/All-Hands-AI/openhands-aci.
"""
from openhands.linter.base import LintResult
from openhands.linter.linter import DefaultLinter
from openhands_aci.linter import DefaultLinter, LintResult
__all__ = ['DefaultLinter', 'LintResult']

View File

@@ -1,79 +0,0 @@
from abc import ABC, abstractmethod
from pydantic import BaseModel
class LintResult(BaseModel):
file: str
line: int # 1-indexed
column: int # 1-indexed
message: str
def visualize(self, half_window: int = 3) -> str:
"""Visualize the lint result by print out all the lines where the lint result is found.
Args:
half_window: The number of context lines to display around the error on each side.
"""
with open(self.file, 'r') as f:
file_lines = f.readlines()
# Add line numbers
_span_size = len(str(len(file_lines)))
file_lines = [
f'{i + 1:>{_span_size}}|{line.rstrip()}'
for i, line in enumerate(file_lines)
]
# Get the window of lines to display
assert self.line <= len(file_lines) and self.line > 0
line_idx = self.line - 1
begin_window = max(0, line_idx - half_window)
end_window = min(len(file_lines), line_idx + half_window + 1)
selected_lines = file_lines[begin_window:end_window]
line_idx_in_window = line_idx - begin_window
# Add character hint
_character_hint = (
_span_size * ' '
+ ' ' * (self.column)
+ '^'
+ ' ERROR HERE: '
+ self.message
)
selected_lines[line_idx_in_window] = (
f'\033[91m{selected_lines[line_idx_in_window]}\033[0m'
+ '\n'
+ _character_hint
)
return '\n'.join(selected_lines)
class LinterException(Exception):
"""Base class for all linter exceptions."""
pass
class BaseLinter(ABC):
"""Base class for all linters.
Each linter should be able to lint files of a specific type and return a list of (parsed) lint results.
"""
encoding: str = 'utf-8'
@property
@abstractmethod
def supported_extensions(self) -> list[str]:
"""The file extensions that this linter supports, such as .py or .tsx."""
return []
@abstractmethod
def lint(self, file_path: str) -> list[LintResult]:
"""Lint the given file.
file_path: The path to the file to lint. Required to be absolute.
"""
pass

View File

@@ -1,98 +0,0 @@
from typing import List
from openhands.core.logger import openhands_logger as logger
from openhands.linter.base import BaseLinter, LintResult
from openhands.linter.utils import run_cmd
def python_compile_lint(fname: str) -> list[LintResult]:
try:
with open(fname, 'r') as f:
code = f.read()
compile(code, fname, 'exec') # USE TRACEBACK BELOW HERE
return []
except SyntaxError as err:
err_lineno = getattr(err, 'end_lineno', err.lineno)
err_offset = getattr(err, 'end_offset', err.offset)
if err_offset and err_offset < 0:
err_offset = err.offset
return [
LintResult(
file=fname, line=err_lineno, column=err_offset or 1, message=err.msg
)
]
def flake_lint(filepath: str) -> list[LintResult]:
fatal = 'F821,F822,F831,E112,E113,E999,E902'
flake8_cmd = f'flake8 --select={fatal} --isolated {filepath}'
try:
cmd_outputs = run_cmd(flake8_cmd)
except FileNotFoundError:
return []
results: list[LintResult] = []
if not cmd_outputs:
return results
for line in cmd_outputs.splitlines():
parts = line.split(':')
if len(parts) >= 4:
_msg = parts[3].strip()
if len(parts) > 4:
_msg += ': ' + parts[4].strip()
try:
line_num = int(parts[1])
except ValueError as e:
logger.warning(
f'Error parsing flake8 output for line: {e}. Parsed parts: {parts}. Skipping...'
)
continue
try:
column_num = int(parts[2])
except ValueError as e:
column_num = 1
_msg = (
parts[2].strip() + ' ' + _msg
) # add the unparsed message to the original message
logger.warning(
f'Error parsing flake8 output for column: {e}. Parsed parts: {parts}. Using default column 1.'
)
results.append(
LintResult(
file=filepath,
line=line_num,
column=column_num,
message=_msg,
)
)
return results
class PythonLinter(BaseLinter):
@property
def supported_extensions(self) -> List[str]:
return ['.py']
def lint(self, file_path: str) -> list[LintResult]:
error = flake_lint(file_path)
if not error:
error = python_compile_lint(file_path)
return error
def compile_lint(self, file_path: str, code: str) -> List[LintResult]:
try:
compile(code, file_path, 'exec')
return []
except SyntaxError as e:
return [
LintResult(
file=file_path,
line=e.lineno,
column=e.offset,
message=str(e),
rule='SyntaxError',
)
]

View File

@@ -1,74 +0,0 @@
import warnings
from grep_ast import TreeContext, filename_to_lang
from grep_ast.parsers import PARSERS
from tree_sitter_languages import get_parser
from openhands.linter.base import BaseLinter, LintResult
# tree_sitter is throwing a FutureWarning
warnings.simplefilter('ignore', category=FutureWarning)
def tree_context(fname, code, line_nums):
context = TreeContext(
fname,
code,
color=False,
line_number=True,
child_context=False,
last_line=False,
margin=0,
mark_lois=True,
loi_pad=3,
# header_max=30,
show_top_of_file_parent_scope=False,
)
line_nums = set(line_nums)
context.add_lines_of_interest(line_nums)
context.add_context()
output = context.format()
return output
def traverse_tree(node):
"""Traverses the tree to find errors."""
errors = []
if node.type == 'ERROR' or node.is_missing:
line_no = node.start_point[0] + 1
col_no = node.start_point[1] + 1
error_type = 'Missing node' if node.is_missing else 'Syntax error'
errors.append((line_no, col_no, error_type))
for child in node.children:
errors += traverse_tree(child)
return errors
class TreesitterBasicLinter(BaseLinter):
@property
def supported_extensions(self) -> list[str]:
return list(PARSERS.keys())
def lint(self, file_path: str) -> list[LintResult]:
"""Use tree-sitter to look for syntax errors, display them with tree context."""
lang = filename_to_lang(file_path)
if not lang:
return []
parser = get_parser(lang)
with open(file_path, 'r') as f:
code = f.read()
tree = parser.parse(bytes(code, 'utf-8'))
errors = traverse_tree(tree.root_node)
if not errors:
return []
return [
LintResult(
file=file_path,
line=int(line),
column=int(col),
message=error_details,
)
for line, col, error_details in errors
]

View File

@@ -1,122 +0,0 @@
import os
from collections import defaultdict
from difflib import SequenceMatcher
from openhands.linter.base import BaseLinter, LinterException, LintResult
from openhands.linter.languages.python import PythonLinter
from openhands.linter.languages.treesitter import TreesitterBasicLinter
class DefaultLinter(BaseLinter):
def __init__(self):
self.linters: dict[str, list[BaseLinter]] = defaultdict(list)
self.linters['.py'] = [PythonLinter()]
# Add treesitter linter as a fallback for all linters
self.basic_linter = TreesitterBasicLinter()
for extension in self.basic_linter.supported_extensions:
self.linters[extension].append(self.basic_linter)
self._supported_extensions = list(self.linters.keys())
@property
def supported_extensions(self) -> list[str]:
return self._supported_extensions
def lint(self, file_path: str) -> list[LintResult]:
if not os.path.isabs(file_path):
raise LinterException(f'File path {file_path} is not an absolute path')
file_extension = os.path.splitext(file_path)[1]
linters: list[BaseLinter] = self.linters.get(file_extension, [])
for linter in linters:
res = linter.lint(file_path)
# We always return the first linter's result (higher priority)
if res:
return res
return []
def lint_file_diff(
self, original_file_path: str, updated_file_path: str
) -> list[LintResult]:
"""Only return lint errors that are introduced by the diff.
Args:
original_file_path: The original file path.
updated_file_path: The updated file path.
Returns:
A list of lint errors that are introduced by the diff.
"""
# 1. Lint the original and updated file
original_lint_errors: list[LintResult] = self.lint(original_file_path)
updated_lint_errors: list[LintResult] = self.lint(updated_file_path)
# 2. Load the original and updated file content
with open(original_file_path, 'r') as f:
old_lines = f.readlines()
with open(updated_file_path, 'r') as f:
new_lines = f.readlines()
# 3. Get line numbers that are changed & unchanged
# Map the line number of the original file to the updated file
# NOTE: this only works for lines that are not changed (i.e., equal)
old_to_new_line_no_mapping: dict[int, int] = {}
replace_or_inserted_lines: list[int] = []
for (
tag,
old_idx_start,
old_idx_end,
new_idx_start,
new_idx_end,
) in SequenceMatcher(
isjunk=None,
a=old_lines,
b=new_lines,
).get_opcodes():
if tag == 'equal':
for idx, _ in enumerate(old_lines[old_idx_start:old_idx_end]):
old_to_new_line_no_mapping[old_idx_start + idx + 1] = (
new_idx_start + idx + 1
)
elif tag == 'replace' or tag == 'insert':
for idx, _ in enumerate(old_lines[old_idx_start:old_idx_end]):
replace_or_inserted_lines.append(new_idx_start + idx + 1)
else:
# omit the case of delete
pass
# 4. Get pre-existing errors in unchanged lines
# increased error elsewhere introduced by the newlines
# i.e., we omit errors that are already in original files and report new one
new_line_no_to_original_errors: dict[int, list[LintResult]] = defaultdict(list)
for error in original_lint_errors:
if error.line in old_to_new_line_no_mapping:
new_line_no_to_original_errors[
old_to_new_line_no_mapping[error.line]
].append(error)
# 5. Select errors from lint results in new file to report
selected_errors = []
for error in updated_lint_errors:
# 5.1. Error introduced by replace/insert
if error.line in replace_or_inserted_lines:
selected_errors.append(error)
# 5.2. Error introduced by modified lines that impacted
# the unchanged lines that HAVE pre-existing errors
elif error.line in new_line_no_to_original_errors:
# skip if the error is already reported
# or add if the error is new
if not any(
original_error.message == error.message
and original_error.column == error.column
for original_error in new_line_no_to_original_errors[error.line]
):
selected_errors.append(error)
# 5.3. Error introduced by modified lines that impacted
# the unchanged lines that have NO pre-existing errors
else:
selected_errors.append(error)
# 6. Sort errors by line and column
selected_errors.sort(key=lambda x: (x.line, x.column))
return selected_errors

View File

@@ -1,3 +0,0 @@
from .cmd import check_tool_installed, run_cmd
__all__ = ['run_cmd', 'check_tool_installed']

View File

@@ -1,37 +0,0 @@
import os
import subprocess
def run_cmd(cmd: str, cwd: str | None = None) -> str | None:
"""Run a command and return the output.
If the command succeeds, return None. If the command fails, return the stdout.
"""
process = subprocess.Popen(
cmd.split(),
cwd=cwd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding='utf-8',
errors='replace',
)
stdout, _ = process.communicate()
if process.returncode == 0:
return None
return stdout
def check_tool_installed(tool_name: str) -> bool:
"""Check if a tool is installed."""
try:
subprocess.run(
[tool_name, '--version'],
check=True,
cwd=os.getcwd(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
return False