-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgit_ops.py
More file actions
230 lines (181 loc) · 9.14 KB
/
git_ops.py
File metadata and controls
230 lines (181 loc) · 9.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
"""Git and SSH key operations for AI Coding Gym CLI."""
import os
import re
import shutil
import subprocess
from pathlib import Path
from typing import Optional
from .config import ensure_config_dir
def _validate_git_ref(name: str, label: str) -> None:
"""Raise ValueError if name contains suspicious shell metacharacters."""
if re.search(r'[;&|`$(){}]', name):
raise ValueError(f"Invalid {label}: {name!r}")
def generate_ssh_key_pair(user_id: str) -> tuple[Path, str]:
"""Generate an SSH key pair for the user.
First checks ~/.mcp-keys/ for existing keys matching the user_id.
If found, copies them to ~/.aicodinggym/ and reuses them.
Otherwise generates new keys in ~/.aicodinggym/{user_id}_id_rsa.
Returns (private_key_path, public_key_content).
"""
key_dir = ensure_config_dir()
key_path = key_dir / f"{user_id}_id_rsa"
if not key_path.exists():
# Check ~/.mcp-keys/ for existing keys matching user_id
mcp_keys_dir = Path.home() / ".mcp-keys"
mcp_private = mcp_keys_dir / f"{user_id}_id_rsa"
mcp_public = mcp_keys_dir / f"{user_id}_id_rsa.pub"
if mcp_private.exists() and mcp_public.exists():
shutil.copy2(mcp_private, key_path)
shutil.copy2(mcp_public, Path(f"{key_path}.pub"))
key_path.chmod(0o600)
else:
result = subprocess.run(
["ssh-keygen", "-t", "rsa", "-b", "4096", "-f", str(key_path),
"-N", "", "-C", f"aicodinggym-{user_id}"],
capture_output=True, text=True,
)
if result.returncode != 0:
raise RuntimeError(f"Failed to generate SSH key: {result.stderr}")
pub_key_path = Path(f"{key_path}.pub")
public_key = pub_key_path.read_text().strip()
return key_path, public_key
def run_git_command(cmd: str, cwd: str, key_path: Optional[Path] = None) -> subprocess.CompletedProcess:
"""Execute a git command with optional SSH key configuration."""
env = os.environ.copy()
if key_path:
env["GIT_SSH_COMMAND"] = f"ssh -i {key_path} -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null"
return subprocess.run(cmd, shell=True, cwd=cwd, capture_output=True, text=True, env=env)
def clone_repo(repo_url: str, branch: str, dest_name: str,
workspace: str, key_path: Path) -> tuple[bool, str]:
"""Clone a repo branch into workspace/dest_name.
Returns (success, message).
"""
problem_dir = Path(workspace) / dest_name
if problem_dir.exists():
# Check if already on the correct branch
result = run_git_command("git rev-parse --abbrev-ref HEAD", str(problem_dir))
if result.returncode == 0 and result.stdout.strip() == branch:
pull = run_git_command(f"git pull origin {branch}", str(problem_dir), key_path)
if pull.returncode != 0:
return False, f"Git pull failed:\n{pull.stderr}"
return True, f"Already exists. Updated to latest version.\nRepository: {problem_dir}\nBranch: {branch}"
return False, (
f"Directory {problem_dir} already exists with different content.\n"
"Remove it first or use --workspace-dir to specify a different location."
)
cmd = f"git clone --single-branch --branch {branch} --depth 1 {repo_url} {dest_name}"
result = run_git_command(cmd, workspace, key_path)
if result.returncode != 0:
return False, f"Git clone failed:\n{result.stderr}\nMake sure the branch '{branch}' exists in the repository."
return True, f"Cloned to: {problem_dir}\nBranch: {branch}"
def clone_repo_cr(repo_url: str, base_branch: str, head_branch: str,
dest_name: str, workspace: str,
key_path: Optional[Path] = None) -> tuple[bool, str]:
"""Clone a code review repo with both base and head branches.
Clones the base branch first (shallow), then fetches the head branch.
Returns (success, message).
"""
_validate_git_ref(base_branch, "base_branch")
_validate_git_ref(head_branch, "head_branch")
_validate_git_ref(repo_url, "repo_url")
_validate_git_ref(dest_name, "dest_name")
problem_dir = Path(workspace) / dest_name
if problem_dir.exists():
# Already cloned — fetch latest for both branches
for branch in (base_branch, head_branch):
result = run_git_command(f"git fetch origin {branch}", str(problem_dir), key_path)
if result.returncode != 0:
return False, f"Git fetch failed for {branch}:\n{result.stderr}"
result = run_git_command(f"git branch -f {branch} FETCH_HEAD", str(problem_dir))
if result.returncode != 0:
return False, f"Failed to update branch {branch}:\n{result.stderr}"
result = run_git_command(f"git checkout {head_branch}", str(problem_dir))
if result.returncode != 0:
return False, f"Failed to checkout head branch '{head_branch}':\n{result.stderr}"
return True, (
f"Already exists. Updated both branches.\n"
f"Repository: {problem_dir}\n"
f"Branches: {base_branch}, {head_branch}"
)
# Clone base branch (shallow); depth 50 needed for diffing between branches
cmd = f"git clone --single-branch --branch {base_branch} --depth 50 {repo_url} {dest_name}"
result = run_git_command(cmd, workspace, key_path)
if result.returncode != 0:
return False, f"Git clone failed:\n{result.stderr}"
# Fetch head branch
fetch_cmd = f"git fetch origin {head_branch}"
result = run_git_command(fetch_cmd, str(problem_dir), key_path)
if result.returncode != 0:
return False, f"Failed to fetch head branch '{head_branch}':\n{result.stderr}"
# Create local head branch tracking the fetched ref
result = run_git_command(f"git branch -f {head_branch} FETCH_HEAD", str(problem_dir))
if result.returncode != 0:
return False, f"Failed to create branch {head_branch}:\n{result.stderr}"
# Check out head branch so the user starts on the code being reviewed
result = run_git_command(f"git checkout {head_branch}", str(problem_dir))
if result.returncode != 0:
return False, f"Failed to checkout head branch '{head_branch}':\n{result.stderr}"
return True, (
f"Cloned to: {problem_dir}\n"
f"Branches: {base_branch}, {head_branch}"
)
def add_commit_push(problem_dir: str, branch: str, key_path: Path,
message: str, force: bool = False) -> tuple[bool, str, str]:
"""Stage, commit, and push changes.
Returns (success, message, commit_hash).
"""
pdir = Path(problem_dir)
# Stage all changes except .github
result = run_git_command("git add -A -- . ':(exclude).github'", str(pdir))
if result.returncode != 0:
return False, f"Git add failed:\n{result.stderr}", ""
# Check for staged changes
status = run_git_command("git diff --cached --name-only", str(pdir))
if not status.stdout.strip():
return False, "No changes to commit. Your working directory is clean.", ""
# Commit
safe_msg = message.replace('"', '\\"')
result = run_git_command(f'git commit -m "{safe_msg}"', str(pdir))
if result.returncode != 0:
return False, f"Git commit failed:\n{result.stderr}", ""
# Get commit hash
hash_result = run_git_command("git rev-parse HEAD", str(pdir))
commit_hash = hash_result.stdout.strip()
# Push
push_flag = "--force-with-lease " if force else ""
result = run_git_command(f"git push {push_flag}origin {branch}", str(pdir), key_path)
if result.returncode != 0:
return False, f"Git push failed:\n{result.stderr}", commit_hash
return True, "Committed and pushed successfully.", commit_hash
def reset_to_setup_commit(problem_dir: str) -> tuple[bool, str]:
"""Reset repo to the original 'Setup SWE-bench instance:' commit.
Returns (success, message).
"""
log_result = run_git_command("git log --format=%H:%s --reverse", problem_dir)
if log_result.returncode != 0:
return False, f"Git log failed:\n{log_result.stderr}"
setup_prefix = "Setup SWE-bench instance:"
setup_commit = None
for line in log_result.stdout.splitlines():
parts = line.split(":", 1)
if len(parts) != 2:
continue
commit_hash, subject = parts[0].strip(), parts[1].strip()
if subject.startswith(setup_prefix):
setup_commit = commit_hash
break
if not setup_commit:
return False, (
f"Could not find the original setup commit.\n"
f"Expected a commit message starting with '{setup_prefix}'."
)
reset = run_git_command(f"git reset --hard {setup_commit}", problem_dir)
if reset.returncode != 0:
return False, f"Git reset failed:\n{reset.stderr}"
clean = run_git_command("git clean -fd", problem_dir)
if clean.returncode != 0:
return False, f"Git clean failed:\n{clean.stderr}"
return True, f"Reset to setup commit {setup_commit[:8]}.\nLocal changes discarded and untracked files removed."
def check_tool_installed(tool_name: str) -> bool:
"""Check if a CLI tool is available on PATH."""
return shutil.which(tool_name) is not None