Replace bash scripts with Python for git operations (#9914)

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
Tim O'Farrell 2025-07-29 07:34:52 -06:00 committed by GitHub
parent 8fb3728391
commit d9a595c9b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 611 additions and 508 deletions

View File

@ -133,7 +133,8 @@ class Runtime(FileEditRuntimeMixin):
git_provider_tokens: PROVIDER_TOKEN_TYPE | None = None,
):
self.git_handler = GitHandler(
execute_shell_fn=self._execute_shell_fn_git_handler
execute_shell_fn=self._execute_shell_fn_git_handler,
create_file_fn=self._create_file_fn_git_handler,
)
self.sid = sid
self.event_stream = event_stream
@ -1017,6 +1018,15 @@ fi
return CommandResult(content=content, exit_code=exit_code)
def _create_file_fn_git_handler(self, path: str, content: str) -> int:
"""
This function is used by the GitHandler to execute shell commands.
"""
obs = self.write(FileWriteAction(path=path, content=content))
if isinstance(obs, ErrorObservation):
return -1
return 0
def get_git_changes(self, cwd: str) -> list[dict[str, str]] | None:
self.git_handler.set_cwd(cwd)
changes = self.git_handler.get_git_changes()

View File

@ -0,0 +1,195 @@
#!/usr/bin/env python3
"""
Get git changes in the current working directory relative to the remote origin if possible.
NOTE: Since this is run as a script, there should be no imports from project files!
"""
import glob
import json
import os
import subprocess
from pathlib import Path
def run(cmd: str, cwd: str) -> str:
result = subprocess.run(
args=cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd
)
byte_content = result.stderr or result.stdout or b''
if result.returncode != 0:
raise RuntimeError(
f'error_running_cmd:{result.returncode}:{byte_content.decode()}'
)
return byte_content.decode().strip()
def get_valid_ref(repo_dir: str) -> str | None:
try:
current_branch = run('git --no-pager rev-parse --abbrev-ref HEAD', repo_dir)
except RuntimeError:
# Not a git repository (Or no commits)
return None
try:
default_branch = (
run('git --no-pager remote show origin | grep "HEAD branch"', repo_dir)
.split()[-1]
.strip()
)
except RuntimeError:
# Git repository does not have a remote origin - use current
return current_branch
ref_current_branch = f'origin/{current_branch}'
ref_non_default_branch = f'$(git --no-pager merge-base HEAD "$(git --no-pager rev-parse --abbrev-ref origin/{default_branch})")'
ref_default_branch = f'origin/{default_branch}'
ref_new_repo = '$(git --no-pager rev-parse --verify 4b825dc642cb6eb9a060e54bf8d69288fbee4904)' # compares with empty tree
refs = [
ref_current_branch,
ref_non_default_branch,
ref_default_branch,
ref_new_repo,
]
# Find a ref that exists...
for ref in refs:
try:
result = run(f'git --no-pager rev-parse --verify {ref}', repo_dir)
return result
except RuntimeError:
# invalid ref - try next
continue
return None
def get_changes_in_repo(repo_dir: str) -> list[dict[str, str]]:
# Gets the status relative to the origin default branch - not the same as `git status`
ref = get_valid_ref(repo_dir)
if not ref:
return []
# Get changed files
changed_files = run(
f'git --no-pager diff --name-status {ref}', repo_dir
).splitlines()
changes = []
for line in changed_files:
if not line.strip():
raise RuntimeError(f'unexpected_value_in_git_diff:{changed_files}')
# Handle different output formats from git diff --name-status
# Depending on git config, format can be either:
# * "A file.txt"
# * "A file.txt"
# * "R100 old_file.txt new_file.txt" (rename with similarity percentage)
parts = line.split()
if len(parts) < 2:
raise RuntimeError(f'unexpected_value_in_git_diff:{changed_files}')
status = parts[0].strip()
# Handle rename operations (status starts with 'R' followed by similarity percentage)
if status.startswith('R') and len(parts) == 3:
# Rename: convert to delete (old path) + add (new path)
old_path = parts[1].strip()
new_path = parts[2].strip()
changes.append(
{
'status': 'D',
'path': old_path,
}
)
changes.append(
{
'status': 'A',
'path': new_path,
}
)
continue
# Handle copy operations (status starts with 'C' followed by similarity percentage)
elif status.startswith('C') and len(parts) == 3:
# Copy: only add the new path (original remains)
new_path = parts[2].strip()
changes.append(
{
'status': 'A',
'path': new_path,
}
)
continue
# Handle regular operations (M, A, D, etc.)
elif len(parts) == 2:
path = parts[1].strip()
else:
raise RuntimeError(f'unexpected_value_in_git_diff:{changed_files}')
if status == '??':
status = 'A'
elif status == '*':
status = 'M'
# Check for valid single-character status codes
if status in {'M', 'A', 'D', 'U'}:
changes.append(
{
'status': status,
'path': path,
}
)
else:
raise RuntimeError(f'unexpected_status_in_git_diff:{changed_files}')
# Get untracked files
untracked_files = run(
'git --no-pager ls-files --others --exclude-standard', repo_dir
).splitlines()
for path in untracked_files:
if path:
changes.append({'status': 'A', 'path': path})
return changes
def get_git_changes(cwd: str) -> list[dict[str, str]]:
git_dirs = {
os.path.dirname(f)[2:]
for f in glob.glob('./*/.git', root_dir=cwd, recursive=True)
}
# First try the workspace directory
changes = get_changes_in_repo(cwd)
# Filter out any changes which are in one of the git directories
changes = [
change
for change in changes
if next(
iter(git_dir for git_dir in git_dirs if change['path'].startswith(git_dir)),
None,
)
is None
]
# Add changes from git directories
for git_dir in git_dirs:
git_dir_changes = get_changes_in_repo(str(Path(cwd, git_dir)))
for change in git_dir_changes:
change['path'] = git_dir + '/' + change['path']
changes.append(change)
changes.sort(key=lambda change: change['path'])
return changes
if __name__ == '__main__':
try:
changes = get_git_changes(os.getcwd())
print(json.dumps(changes))
except Exception as e:
print(json.dumps({'error': str(e)}))

View File

@ -0,0 +1,103 @@
#!/usr/bin/env python3
"""
Get git diff in a single git file for the closest git repo in the file system
NOTE: Since this is run as a script, there should be no imports from project files!
"""
import json
import os
import subprocess
import sys
from pathlib import Path
def get_closest_git_repo(path: Path) -> Path | None:
while True:
path = path.parent
git_path = Path(path, '.git')
if git_path.is_dir():
return path
if path.parent == path:
return None
def run(cmd: str, cwd: str) -> str:
result = subprocess.run(
args=cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd
)
byte_content = result.stderr or result.stdout or b''
if result.returncode != 0:
raise RuntimeError(
f'error_running_cmd:{result.returncode}:{byte_content.decode()}'
)
return byte_content.decode().strip()
def get_valid_ref(repo_dir: str) -> str | None:
try:
current_branch = run('git --no-pager rev-parse --abbrev-ref HEAD', repo_dir)
except RuntimeError:
# Not a git repository (Or no commits)
return None
try:
default_branch = (
run('git --no-pager remote show origin | grep "HEAD branch"', repo_dir)
.split()[-1]
.strip()
)
except RuntimeError:
# Git repository does not have a remote origin - use current
return current_branch
ref_current_branch = f'origin/{current_branch}'
ref_non_default_branch = f'$(git --no-pager merge-base HEAD "$(git --no-pager rev-parse --abbrev-ref origin/{default_branch})")'
ref_default_branch = 'origin/' + default_branch
ref_new_repo = '$(git --no-pager rev-parse --verify 4b825dc642cb6eb9a060e54bf8d69288fbee4904)' # compares with empty tree
refs = [
ref_current_branch,
ref_non_default_branch,
ref_default_branch,
ref_new_repo,
]
# Find a ref that exists...
for ref in refs:
try:
run(f'git --no-pager rev-parse --verify {ref}', repo_dir)
return ref
except RuntimeError:
# invalid ref - try next
continue
return None
def get_git_diff(relative_file_path: str) -> dict[str, str]:
path = Path(os.getcwd(), relative_file_path).resolve()
closest_git_repo = get_closest_git_repo(path)
if not closest_git_repo:
raise ValueError('no_repo')
current_rev = get_valid_ref(str(closest_git_repo))
try:
original = run(
f'git show "{current_rev}:{path.relative_to(closest_git_repo)}"',
str(closest_git_repo),
)
except RuntimeError:
original = ''
try:
with open(path, 'r') as f:
modified = '\n'.join(f.read().splitlines())
except FileNotFoundError:
modified = ''
return {
'modified': modified,
'original': original,
}
if __name__ == '__main__':
diff = get_git_diff(sys.argv[-1])
print(json.dumps(diff))

View File

@ -1,6 +1,15 @@
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Callable
from uuid import uuid4
from openhands.core.logger import openhands_logger as logger
from openhands.runtime.utils import git_changes, git_diff
GIT_CHANGES_CMD = 'python3 /openhands/code/openhands/runtime/utils/git_changes.py'
GIT_DIFF_CMD = (
'python3 /openhands/code/openhands/runtime/utils/git_diff.py "{file_path}"'
)
@dataclass
@ -25,9 +34,13 @@ class GitHandler:
def __init__(
self,
execute_shell_fn: Callable[[str, str | None], CommandResult],
create_file_fn: Callable[[str, str], int],
):
self.execute = execute_shell_fn
self.create_file_fn = create_file_fn
self.cwd: str | None = None
self.git_changes_cmd = GIT_CHANGES_CMD
self.git_diff_cmd = GIT_DIFF_CMD
def set_cwd(self, cwd: str) -> None:
"""
@ -38,148 +51,13 @@ class GitHandler:
"""
self.cwd = cwd
def _is_git_repo(self) -> bool:
"""
Checks if the current directory is a Git repository.
Returns:
bool: True if inside a Git repository, otherwise False.
"""
cmd = 'git --no-pager rev-parse --is-inside-work-tree'
output = self.execute(cmd, self.cwd)
return output.content.strip() == 'true'
def _get_current_file_content(self, file_path: str) -> str:
"""
Retrieves the current content of a given file.
Args:
file_path (str): Path to the file.
Returns:
str: The file content.
"""
output = self.execute(f'cat {file_path}', self.cwd)
return output.content
def _verify_ref_exists(self, ref: str) -> bool:
"""
Verifies whether a specific Git reference exists.
Args:
ref (str): The Git reference to check.
Returns:
bool: True if the reference exists, otherwise False.
"""
cmd = f'git --no-pager rev-parse --verify {ref}'
output = self.execute(cmd, self.cwd)
return output.exit_code == 0
def _get_ref_content(self, file_path: str) -> str:
"""
Retrieves the content of a file from a valid Git reference.
Finds the git repository closest to the file in the tree and executes the command in that context.
Args:
file_path (str): The file path in the repository.
Returns:
str: The content of the file from the reference, or an empty string if unavailable.
"""
if not self.cwd:
return ''
unique_id = uuid4().hex
# Single bash command that finds the closest git repository to the file and gets the ref content
cmd = f"""bash -c '
# Convert to absolute path
file_path="$(realpath "{file_path}")"
# Find the closest git repository by walking up the directory tree
current_dir="$(dirname "$file_path")"
git_repo_dir=""
while [[ "$current_dir" != "/" ]]; do
if [[ -d "$current_dir/.git" ]] || git -C "$current_dir" rev-parse --git-dir >/dev/null 2>&1; then
git_repo_dir="$current_dir"
break
fi
current_dir="$(dirname "$current_dir")"
done
# If no git repository found, exit
if [[ -z "$git_repo_dir" ]]; then
exit 1
fi
# Get the file path relative to the git repository root
repo_root="$(cd "$git_repo_dir" && git rev-parse --show-toplevel)"
relative_file_path="${{file_path#${{repo_root}}/}}"
# Function to get current branch
get_current_branch() {{
git -C "$git_repo_dir" rev-parse --abbrev-ref HEAD 2>/dev/null
}}
# Function to get default branch
get_default_branch() {{
git -C "$git_repo_dir" remote show origin 2>/dev/null | grep "HEAD branch" | awk "{{print \\$NF}}" || echo "main"
}}
# Function to verify if a ref exists
verify_ref_exists() {{
git -C "$git_repo_dir" rev-parse --verify "$1" >/dev/null 2>&1
}}
# Get valid reference for comparison
current_branch="$(get_current_branch)"
default_branch="$(get_default_branch)"
# Check if origin remote exists
has_origin="$(git -C "$git_repo_dir" remote | grep -q "^origin$" && echo "true" || echo "false")"
if [[ "$has_origin" == "true" ]]; then
ref_current_branch="origin/$current_branch"
ref_non_default_branch="$(git -C "$git_repo_dir" merge-base HEAD "$(git -C "$git_repo_dir" rev-parse --abbrev-ref origin/$default_branch)" 2>/dev/null || echo "")"
ref_default_branch="origin/$default_branch"
else
# For repositories without origin, try HEAD~1 (previous commit) or empty tree
ref_current_branch="HEAD~1"
ref_non_default_branch=""
ref_default_branch=""
fi
ref_new_repo="$(git -C "$git_repo_dir" rev-parse --verify 4b825dc642cb6eb9a060e54bf8d69288fbee4904 2>/dev/null || echo "")" # empty tree
# Try refs in order of preference
valid_ref=""
for ref in "$ref_current_branch" "$ref_non_default_branch" "$ref_default_branch" "$ref_new_repo"; do
if [[ -n "$ref" ]] && verify_ref_exists "$ref"; then
valid_ref="$ref"
break
fi
done
# If no valid ref found, exit
if [[ -z "$valid_ref" ]]; then
exit 1
fi
# Get the file content from the reference
git -C "$git_repo_dir" show "$valid_ref:$relative_file_path" 2>/dev/null || exit 1
# {unique_id}'"""
result = self.execute(cmd, self.cwd)
if result.exit_code != 0:
return ''
# TODO: The command echoes the bash script. Why?
content = result.content.split(f'{unique_id}')[-1]
return content
def _create_python_script_file(self, file: str):
result = self.execute('mktemp -d', self.cwd)
script_file = Path(result.content.strip(), Path(file).name)
with open(file, 'r') as f:
self.create_file_fn(str(script_file), f.read())
result = self.execute(f'chmod +x "{script_file}"', self.cwd)
return script_file
def get_git_changes(self) -> list[dict[str, str]] | None:
"""
@ -195,57 +73,31 @@ class GitHandler:
if not self.cwd:
return None
# Single bash command that:
# 1. Creates a list of directories to check (current dir + direct subdirectories)
# 2. For each directory, checks if it's a git repo and gets status
# 3. Outputs in format: REPO_PATH|STATUS|FILE_PATH
cmd = """bash -c '
{
# Check current directory first
echo "."
# List direct subdirectories (excluding hidden ones)
find . -maxdepth 1 -type d ! -name ".*" ! -name "." 2>/dev/null || true
} | while IFS= read -r dir; do
if [ -d "$dir/.git" ] || git -C "$dir" rev-parse --git-dir >/dev/null 2>&1; then
# Get absolute path of the directory
# Get git status for this repository
git -C "$dir" status --porcelain -uall 2>/dev/null | while IFS= read -r line; do
if [ -n "$line" ]; then
# Extract status (first 2 chars) and file path (from char 3 onwards)
status=$(echo "$line" | cut -c1-2)
file_path=$(echo "$line" | cut -c4-)
# Convert status codes to single character
case "$status" in
"M "*|" M") echo "$dir|M|$file_path" ;;
"A "*|" A") echo "$dir|A|$file_path" ;;
"D "*|" D") echo "$dir|D|$file_path" ;;
"R "*|" R") echo "$dir|R|$file_path" ;;
"C "*|" C") echo "$dir|C|$file_path" ;;
"U "*|" U") echo "$dir|U|$file_path" ;;
"??") echo "$dir|A|$file_path" ;;
*) echo "$dir|M|$file_path" ;;
esac
fi
done
fi
done
' """
result = self.execute(self.git_changes_cmd, self.cwd)
if result.exit_code == 0:
try:
changes = json.loads(result.content)
return changes
except Exception:
logger.exception(
'GitHandler:get_git_changes:error',
extra={'content': result.content},
)
return None
result = self.execute(cmd.strip(), self.cwd)
if result.exit_code != 0 or not result.content.strip():
if self.git_changes_cmd != GIT_CHANGES_CMD:
# We have already tried to add a script to the workspace - it did not work
return None
# Parse the output
changes = []
for line in result.content.strip().split('\n'):
if '|' in line:
parts = line.split('|', 2)
if len(parts) == 3:
repo_path, status, file_path = parts
file_path = f'{repo_path}/{file_path}'[2:]
changes.append({'status': status, 'path': file_path})
# We try to add a script for getting git changes to the runtime - legacy runtimes may be missing the script
logger.info(
'GitHandler:get_git_changes: adding git_changes script to runtime...'
)
script_file = self._create_python_script_file(git_changes.__file__)
self.git_changes_cmd = f'python3 {script_file}'
return changes if changes else None
# Try again with the new changes cmd
return self.get_git_changes()
def get_git_diff(self, file_path: str) -> dict[str, str]:
"""
@ -257,36 +109,23 @@ class GitHandler:
Returns:
dict[str, str]: A dictionary containing the original and modified content.
"""
modified = self._get_current_file_content(file_path)
original = self._get_ref_content(file_path)
# If cwd is not set, return None
if not self.cwd:
raise ValueError('no_dir_in_git_diff')
return {
'modified': modified,
'original': original,
}
result = self.execute(self.git_diff_cmd.format(file_path=file_path), self.cwd)
if result.exit_code == 0:
diff = json.loads(result.content)
return diff
if self.git_diff_cmd != GIT_DIFF_CMD:
# We have already tried to add a script to the workspace - it did not work
raise ValueError('error_in_git_diff')
def parse_git_changes(changes_list: list[str]) -> list[dict[str, str]]:
"""
Parses the list of changed files and extracts their statuses and paths.
# We try to add a script for getting git changes to the runtime - legacy runtimes may be missing the script
logger.info('GitHandler:get_git_diff: adding git_diff script to runtime...')
script_file = self._create_python_script_file(git_diff.__file__)
self.git_diff_cmd = f'python3 {script_file} "{{file_path}}"'
Args:
changes_list (list[str]): List of changed file entries.
Returns:
list[dict[str, str]]: Parsed list of file changes with statuses.
"""
result = []
for line in changes_list:
status = line[:2].strip()
path = line[2:].strip()
# Get the first non-space character as the primary status
primary_status = status.replace(' ', '')[0]
result.append(
{
'status': primary_status,
'path': path,
}
)
return result
# Try again with the new changes cmd
return self.get_git_diff(file_path)

View File

@ -1,12 +1,19 @@
import os
import shutil
import subprocess
import sys
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
import pytest
from openhands.runtime.utils import git_changes, git_diff, git_handler
from openhands.runtime.utils.git_handler import CommandResult, GitHandler
@pytest.mark.skipif(sys.platform == 'win32', reason='Windows is not supported')
class TestGitHandler(unittest.TestCase):
def setUp(self):
# Create temporary directories for our test repositories
@ -20,11 +27,17 @@ class TestGitHandler(unittest.TestCase):
# Track executed commands for verification
self.executed_commands = []
self.created_files = []
# Initialize the GitHandler with our real execute function
self.git_handler = GitHandler(self._execute_command)
# Initialize the GitHandler with our mock functions
self.git_handler = GitHandler(
execute_shell_fn=self._execute_command, create_file_fn=self._create_file
)
self.git_handler.set_cwd(self.local_dir)
self.git_handler.git_changes_cmd = f'python3 {git_changes.__file__}'
self.git_handler.git_diff_cmd = f'python3 {git_diff.__file__} "{{file_path}}"'
# Set up the git repositories
self._setup_git_repos()
@ -34,202 +47,265 @@ class TestGitHandler(unittest.TestCase):
def _execute_command(self, cmd, cwd=None):
"""Execute a shell command and return the result."""
self.executed_commands.append((cmd, cwd))
try:
result = subprocess.run(
cmd, shell=True, cwd=cwd, capture_output=True, text=True, check=False
result = subprocess.run(
args=cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=cwd,
)
stderr = result.stderr or b''
stdout = result.stdout or b''
return CommandResult((stderr + stdout).decode(), result.returncode)
def run_command(self, cmd, cwd=None):
result = self._execute_command(cmd, cwd)
if result.exit_code != 0:
raise RuntimeError(
f'command_error:{cmd};{result.exit_code};{result.content}'
)
return CommandResult(result.stdout, result.returncode)
except Exception as e:
return CommandResult(str(e), 1)
def _create_file(self, path, content):
"""Mock function for creating files."""
self.created_files.append((path, content))
try:
with open(path, 'w') as f:
f.write(content)
return 0
except Exception:
return -1
def write_file(
self,
dir: str,
name: str,
additional_content: tuple[str, ...] = ('Line 1', 'Line 2', 'Line 3'),
):
with open(os.path.join(dir, name), 'w') as f:
f.write(name)
for line in additional_content:
f.write('\n')
f.write(line)
assert os.path.exists(os.path.join(dir, name))
def _setup_git_repos(self):
"""Set up real git repositories for testing."""
# Set up origin repository
self.run_command('git init --initial-branch=main', self.origin_dir)
self._execute_command(
'git --no-pager init --initial-branch=main', self.origin_dir
)
self._execute_command(
"git --no-pager config user.email 'test@example.com'", self.origin_dir
)
self._execute_command(
"git --no-pager config user.name 'Test User'", self.origin_dir
"git config user.email 'test@example.com'", self.origin_dir
)
self._execute_command("git config user.name 'Test User'", self.origin_dir)
# Create a file and commit it
with open(os.path.join(self.origin_dir, 'file1.txt'), 'w') as f:
f.write('Original content')
self._execute_command('git --no-pager add file1.txt', self.origin_dir)
self._execute_command(
"git --no-pager commit -m 'Initial commit'", self.origin_dir
)
# Set up the initial state...
self.write_file(self.origin_dir, 'unchanged.txt')
self.write_file(self.origin_dir, 'committed_modified.txt')
self.write_file(self.origin_dir, 'staged_modified.txt')
self.write_file(self.origin_dir, 'unstaged_modified.txt')
self.write_file(self.origin_dir, 'committed_delete.txt')
self.write_file(self.origin_dir, 'staged_delete.txt')
self.write_file(self.origin_dir, 'unstaged_delete.txt')
self.run_command("git add . && git commit -m 'Initial Commit'", self.origin_dir)
# Clone the origin repository to local
self.run_command(f'git clone "{self.origin_dir}" "{self.local_dir}"')
self._execute_command(
f'git --no-pager clone {self.origin_dir} {self.local_dir}'
"git config user.email 'test@example.com'", self.local_dir
)
self._execute_command(
"git --no-pager config user.email 'test@example.com'", self.local_dir
)
self._execute_command(
"git --no-pager config user.name 'Test User'", self.local_dir
self._execute_command("git config user.name 'Test User'", self.local_dir)
self.run_command('git checkout -b feature-branch', self.local_dir)
# Setup committed changes...
self.write_file(self.local_dir, 'committed_modified.txt', ('Line 4',))
self.write_file(self.local_dir, 'committed_add.txt')
os.remove(os.path.join(self.local_dir, 'committed_delete.txt'))
self.run_command(
"git add . && git commit -m 'First batch of changes'", self.local_dir
)
# Create a feature branch in the local repository
self._execute_command(
'git --no-pager checkout -b feature-branch', self.local_dir
)
# Setup staged changes...
self.write_file(self.local_dir, 'staged_modified.txt', ('Line 4',))
self.write_file(self.local_dir, 'staged_add.txt')
os.remove(os.path.join(self.local_dir, 'staged_delete.txt'))
self.run_command('git add .', self.local_dir)
# Modify a file and create a new file
with open(os.path.join(self.local_dir, 'file1.txt'), 'w') as f:
f.write('Modified content')
# Setup unstaged changes...
self.write_file(self.local_dir, 'unstaged_modified.txt', ('Line 4',))
self.write_file(self.local_dir, 'unstaged_add.txt')
os.remove(os.path.join(self.local_dir, 'unstaged_delete.txt'))
with open(os.path.join(self.local_dir, 'file2.txt'), 'w') as f:
f.write('New file content')
def setup_nested(self):
nested_1 = Path(self.local_dir, 'nested 1')
nested_1.mkdir()
nested_1 = str(nested_1)
self.run_command('git init --initial-branch=main', nested_1)
self._execute_command("git config user.email 'test@example.com'", nested_1)
self._execute_command("git config user.name 'Test User'", nested_1)
self.write_file(nested_1, 'committed_add.txt')
self.run_command('git add .', nested_1)
self.run_command('git commit -m "Initial Commit"', nested_1)
self.write_file(nested_1, 'staged_add.txt')
# Add and commit file1.txt changes to create a baseline
self._execute_command('git --no-pager add file1.txt', self.local_dir)
self._execute_command(
"git --no-pager commit -m 'Update file1.txt'", self.local_dir
)
# Add and commit file2.txt, then modify it
self._execute_command('git --no-pager add file2.txt', self.local_dir)
self._execute_command(
"git --no-pager commit -m 'Add file2.txt'", self.local_dir
)
# Modify file2.txt and stage it
with open(os.path.join(self.local_dir, 'file2.txt'), 'w') as f:
f.write('Modified new file content')
self._execute_command('git --no-pager add file2.txt', self.local_dir)
# Create a file that will be deleted
with open(os.path.join(self.local_dir, 'file3.txt'), 'w') as f:
f.write('File to be deleted')
self._execute_command('git --no-pager add file3.txt', self.local_dir)
self._execute_command(
"git --no-pager commit -m 'Add file3.txt'", self.local_dir
)
self._execute_command('git --no-pager rm file3.txt', self.local_dir)
# Modify file1.txt again but don't stage it (unstaged change)
with open(os.path.join(self.local_dir, 'file1.txt'), 'w') as f:
f.write('Modified content again')
# Push the feature branch to origin
self._execute_command(
'git --no-pager push -u origin feature-branch', self.local_dir
)
def test_is_git_repo(self):
"""Test that _is_git_repo returns True for a git repository."""
self.assertTrue(self.git_handler._is_git_repo())
# Verify the command was executed
self.assertTrue(
any(
cmd == 'git --no-pager rev-parse --is-inside-work-tree'
for cmd, _ in self.executed_commands
)
)
def test_get_current_file_content(self):
"""Test that _get_current_file_content returns the current content of a file."""
content = self.git_handler._get_current_file_content('file1.txt')
self.assertEqual(content.strip(), 'Modified content again')
# Verify the command was executed
self.assertTrue(
any(cmd == 'cat file1.txt' for cmd, _ in self.executed_commands)
)
nested_2 = Path(self.local_dir, 'nested_2')
nested_2.mkdir()
nested_2 = str(nested_2)
self.run_command('git init --initial-branch=main', nested_2)
self._execute_command("git config user.email 'test@example.com'", nested_2)
self._execute_command("git config user.name 'Test User'", nested_2)
self.write_file(nested_2, 'committed_add.txt')
self.run_command('git add .', nested_2)
self.run_command('git commit -m "Initial Commit"', nested_2)
self.write_file(nested_2, 'unstaged_add.txt')
def test_get_git_changes(self):
"""Test that get_git_changes returns the combined list of changed and untracked files."""
# Create an untracked file
with open(os.path.join(self.local_dir, 'untracked.txt'), 'w') as f:
f.write('Untracked file content')
"""
Test with unpushed commits, staged commits, and unstaged commits
"""
changes = self.git_handler.get_git_changes()
# Create a new file and stage it
with open(os.path.join(self.local_dir, 'new_file2.txt'), 'w') as f:
f.write('New file 2 content')
self._execute_command('git --no-pager add new_file2.txt', self.local_dir)
expected_changes = [
{'status': 'A', 'path': 'committed_add.txt'},
{'status': 'D', 'path': 'committed_delete.txt'},
{'status': 'M', 'path': 'committed_modified.txt'},
{'status': 'A', 'path': 'staged_add.txt'},
{'status': 'D', 'path': 'staged_delete.txt'},
{'status': 'M', 'path': 'staged_modified.txt'},
{'status': 'A', 'path': 'unstaged_add.txt'},
{'status': 'D', 'path': 'unstaged_delete.txt'},
{'status': 'M', 'path': 'unstaged_modified.txt'},
]
if changes != expected_changes:
raise RuntimeError(
'\n'.join(
[
f'incorrect_changes: {changes};',
f'content: {os.listdir(self.local_dir)}',
f'ref: {git_changes.get_valid_ref(self.local_dir)}',
]
)
)
assert changes == expected_changes
def test_get_git_changes_after_push(self):
"""
Test with staged commits, and unstaged commits
"""
self.run_command('git push -u origin feature-branch', self.local_dir)
changes = self.git_handler.get_git_changes()
expected_changes = [
{'status': 'A', 'path': 'staged_add.txt'},
{'status': 'D', 'path': 'staged_delete.txt'},
{'status': 'M', 'path': 'staged_modified.txt'},
{'status': 'A', 'path': 'unstaged_add.txt'},
{'status': 'D', 'path': 'unstaged_delete.txt'},
{'status': 'M', 'path': 'unstaged_modified.txt'},
]
assert changes == expected_changes
def test_get_git_changes_nested_repos(self):
"""
Test with staged commits, and unstaged commits
"""
self.setup_nested()
changes = self.git_handler.get_git_changes()
self.assertIsNotNone(changes)
# Should include file1.txt (modified), file3.txt (deleted), new_file2.txt (added), and untracked.txt (untracked)
paths = [change['path'] for change in changes]
self.assertIn('file1.txt', paths)
self.assertIn('file3.txt', paths)
self.assertIn('new_file2.txt', paths)
self.assertIn('untracked.txt', paths)
expected_changes = [
{'status': 'A', 'path': 'committed_add.txt'},
{'status': 'D', 'path': 'committed_delete.txt'},
{'status': 'M', 'path': 'committed_modified.txt'},
{'status': 'A', 'path': 'nested 1/staged_add.txt'},
{'status': 'A', 'path': 'nested_2/unstaged_add.txt'},
{'status': 'A', 'path': 'staged_add.txt'},
{'status': 'D', 'path': 'staged_delete.txt'},
{'status': 'M', 'path': 'staged_modified.txt'},
{'status': 'A', 'path': 'unstaged_add.txt'},
{'status': 'D', 'path': 'unstaged_delete.txt'},
{'status': 'M', 'path': 'unstaged_modified.txt'},
]
# Check that the changes include both changed and untracked files
statuses = [change['status'] for change in changes]
self.assertIn('M', statuses) # Modified
self.assertIn('A', statuses) # Added
self.assertIn('D', statuses) # Deleted
assert changes == expected_changes
def test_get_git_changes_multiple_repositories(self):
"""Test that get_git_changes can detect changes in multiple git repositories within a workspace."""
# Create a workspace directory with multiple git repositories
workspace_dir = os.path.join(self.test_dir, 'workspace')
repo1_dir = os.path.join(workspace_dir, 'repo1')
repo2_dir = os.path.join(workspace_dir, 'repo2')
non_git_dir = os.path.join(workspace_dir, 'non_git')
def test_get_git_diff_staged_modified(self):
"""Test on a staged modified"""
diff = self.git_handler.get_git_diff('staged_modified.txt')
expected_diff = {
'original': 'staged_modified.txt\nLine 1\nLine 2\nLine 3',
'modified': 'staged_modified.txt\nLine 4',
}
assert diff == expected_diff
os.makedirs(workspace_dir, exist_ok=True)
os.makedirs(repo1_dir, exist_ok=True)
os.makedirs(repo2_dir, exist_ok=True)
os.makedirs(non_git_dir, exist_ok=True)
def test_get_git_diff_unchanged(self):
"""Test that get_git_diff delegates to the git_diff module."""
diff = self.git_handler.get_git_diff('unchanged.txt')
expected_diff = {
'original': 'unchanged.txt\nLine 1\nLine 2\nLine 3',
'modified': 'unchanged.txt\nLine 1\nLine 2\nLine 3',
}
assert diff == expected_diff
# Set up repo1
self._execute_command('git --no-pager init', repo1_dir)
self._execute_command(
"git --no-pager config user.email 'test@example.com'", repo1_dir
)
self._execute_command("git --no-pager config user.name 'Test User'", repo1_dir)
with open(os.path.join(repo1_dir, 'repo1_file.txt'), 'w') as f:
f.write('repo1 content')
self._execute_command('git --no-pager add repo1_file.txt', repo1_dir)
self._execute_command("git --no-pager commit -m 'Initial commit'", repo1_dir)
# Modify the file to create changes
with open(os.path.join(repo1_dir, 'repo1_file.txt'), 'w') as f:
f.write('repo1 modified content')
def test_get_git_diff_unpushed(self):
"""Test that get_git_diff delegates to the git_diff module."""
diff = self.git_handler.get_git_diff('committed_modified.txt')
expected_diff = {
'original': 'committed_modified.txt\nLine 1\nLine 2\nLine 3',
'modified': 'committed_modified.txt\nLine 4',
}
assert diff == expected_diff
# Set up repo2
self._execute_command('git --no-pager init', repo2_dir)
self._execute_command(
"git --no-pager config user.email 'test@example.com'", repo2_dir
)
self._execute_command("git --no-pager config user.name 'Test User'", repo2_dir)
with open(os.path.join(repo2_dir, 'repo2_file.txt'), 'w') as f:
f.write('repo2 content')
self._execute_command('git --no-pager add repo2_file.txt', repo2_dir)
self._execute_command("git --no-pager commit -m 'Initial commit'", repo2_dir)
# Add an untracked file
with open(os.path.join(repo2_dir, 'untracked.txt'), 'w') as f:
f.write('untracked content')
def test_get_git_diff_unstaged_add(self):
"""Test that get_git_diff delegates to the git_diff module."""
diff = self.git_handler.get_git_diff('unstaged_add.txt')
expected_diff = {
'original': '',
'modified': 'unstaged_add.txt\nLine 1\nLine 2\nLine 3',
}
assert diff == expected_diff
# Add a file to the non-git directory (should be ignored)
with open(os.path.join(non_git_dir, 'ignored_file.txt'), 'w') as f:
f.write('ignored content')
def test_get_git_changes_fallback(self):
"""Test that get_git_changes falls back to creating a script file when needed."""
# Create a GitHandler for the workspace directory
workspace_handler = GitHandler(self._execute_command)
workspace_handler.set_cwd(workspace_dir)
# Break the git changes command
with patch(
'openhands.runtime.utils.git_handler.GIT_CHANGES_CMD',
'non-existant-command',
):
self.git_handler.git_changes_cmd = git_handler.GIT_CHANGES_CMD
# Clear executed commands to start fresh
self.executed_commands = []
changes = self.git_handler.get_git_changes()
# Get changes from all repositories
changes = workspace_handler.get_git_changes()
self.assertIsNotNone(changes)
expected_changes = [
{'status': 'A', 'path': 'committed_add.txt'},
{'status': 'D', 'path': 'committed_delete.txt'},
{'status': 'M', 'path': 'committed_modified.txt'},
{'status': 'A', 'path': 'staged_add.txt'},
{'status': 'D', 'path': 'staged_delete.txt'},
{'status': 'M', 'path': 'staged_modified.txt'},
{'status': 'A', 'path': 'unstaged_add.txt'},
{'status': 'D', 'path': 'unstaged_delete.txt'},
{'status': 'M', 'path': 'unstaged_modified.txt'},
]
# Should find changes from both repositories
assert len(changes) == 2
assert {'status': 'M', 'path': 'repo1/repo1_file.txt'} in changes
assert {'status': 'A', 'path': 'repo2/untracked.txt'} in changes
assert changes == expected_changes
def test_get_git_diff_fallback(self):
"""Test that get_git_diff delegates to the git_diff module."""
# Break the git diff command
with patch(
'openhands.runtime.utils.git_handler.GIT_DIFF_CMD', 'non-existant-command'
):
self.git_handler.git_diff_cmd = git_handler.GIT_DIFF_CMD
diff = self.git_handler.get_git_diff('unchanged.txt')
expected_diff = {
'original': 'unchanged.txt\nLine 1\nLine 2\nLine 3',
'modified': 'unchanged.txt\nLine 1\nLine 2\nLine 3',
}
assert diff == expected_diff

View File

@ -1,120 +0,0 @@
import os
import shutil
import subprocess
import tempfile
import unittest
from openhands.runtime.utils.git_handler import CommandResult, GitHandler
class TestGitHandlerWithRealRepo(unittest.TestCase):
def setUp(self):
# Create temporary directories for our test repositories
self.test_dir = tempfile.mkdtemp()
self.origin_dir = os.path.join(self.test_dir, 'origin')
self.local_dir = os.path.join(self.test_dir, 'local')
# Create the directories
os.makedirs(self.origin_dir, exist_ok=True)
os.makedirs(self.local_dir, exist_ok=True)
# Set up the git repositories
self._setup_git_repos()
# Initialize the GitHandler with a real execute function
self.git_handler = GitHandler(self._execute_command)
self.git_handler.set_cwd(self.local_dir)
def tearDown(self):
# Clean up the temporary directories
shutil.rmtree(self.test_dir)
def _execute_command(self, cmd, cwd=None):
"""Execute a shell command and return the result."""
try:
result = subprocess.run(
cmd, shell=True, cwd=cwd, capture_output=True, text=True, check=False
)
return CommandResult(result.stdout, result.returncode)
except Exception as e:
return CommandResult(str(e), 1)
def _setup_git_repos(self):
"""Set up real git repositories for testing."""
# Set up origin repository
self._execute_command('git init --initial-branch=main', self.origin_dir)
self._execute_command(
"git config user.email 'test@example.com'", self.origin_dir
)
self._execute_command("git config user.name 'Test User'", self.origin_dir)
# Create a file and commit it
with open(os.path.join(self.origin_dir, 'file1.txt'), 'w') as f:
f.write('Original content')
self._execute_command('git add file1.txt', self.origin_dir)
self._execute_command("git commit -m 'Initial commit'", self.origin_dir)
# Clone the origin repository to local
self._execute_command(f'git clone {self.origin_dir} {self.local_dir}')
self._execute_command(
"git config user.email 'test@example.com'", self.local_dir
)
self._execute_command("git config user.name 'Test User'", self.local_dir)
# Create a feature branch in the local repository
self._execute_command('git checkout -b feature-branch', self.local_dir)
# Modify a file and create a new file
with open(os.path.join(self.local_dir, 'file1.txt'), 'w') as f:
f.write('Modified content')
with open(os.path.join(self.local_dir, 'file2.txt'), 'w') as f:
f.write('New file content')
# Add the new file but don't commit anything yet
self._execute_command('git add file2.txt', self.local_dir)
def test_is_git_repo(self):
"""Test that _is_git_repo returns True for a git repository."""
self.assertTrue(self.git_handler._is_git_repo())
def test_get_ref_content(self):
"""Test that _get_ref_content returns the content from a valid ref."""
# First commit the changes to make sure we have a valid ref
self._execute_command('git add file1.txt', self.local_dir)
self._execute_command("git commit -m 'Update file1.txt'", self.local_dir)
# Get the content of file1.txt from the main branch
content = self.git_handler._get_ref_content('file1.txt')
self.assertEqual(content.strip(), 'Original content')
def test_get_current_file_content(self):
"""Test that _get_current_file_content returns the current content of a file."""
content = self.git_handler._get_current_file_content('file1.txt')
self.assertEqual(content.strip(), 'Modified content')
def test_get_git_changes(self):
"""Test that get_git_changes returns the combined list of changed and untracked files."""
# Create an untracked file
with open(os.path.join(self.local_dir, 'untracked.txt'), 'w') as f:
f.write('Untracked file content')
changes = self.git_handler.get_git_changes()
self.assertIsNotNone(changes)
# Should include file1.txt (modified), file2.txt (added), and untracked.txt (untracked)
paths = [change['path'] for change in changes]
self.assertIn('file1.txt', paths)
self.assertIn('file2.txt', paths)
self.assertIn('untracked.txt', paths)
def test_get_git_diff(self):
"""Test that get_git_diff returns the original and modified content of a file."""
diff = self.git_handler.get_git_diff('file1.txt')
self.assertEqual(diff['modified'].strip(), 'Modified content')
self.assertEqual(diff['original'].strip(), 'Original content')
if __name__ == '__main__':
unittest.main()