mirror of
https://github.com/browseros-ai/BrowserOS.git
synced 2026-05-21 21:05:09 +00:00
634 lines
21 KiB
Python
634 lines
21 KiB
Python
"""
|
|
Shared utilities for Dev CLI operations
|
|
|
|
This module provides robust utilities for git operations, diff parsing,
|
|
and patch management with comprehensive error handling.
|
|
"""
|
|
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import click
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Optional, List, Dict, Tuple, NamedTuple
|
|
from enum import Enum
|
|
from dataclasses import dataclass
|
|
from context import BuildContext
|
|
from utils import log_info, log_error, log_success, log_warning
|
|
|
|
|
|
class FileOperation(Enum):
|
|
"""Types of file operations in a diff"""
|
|
ADD = "add"
|
|
MODIFY = "modify"
|
|
DELETE = "delete"
|
|
RENAME = "rename"
|
|
COPY = "copy"
|
|
BINARY = "binary"
|
|
|
|
|
|
@dataclass
|
|
class FilePatch:
|
|
"""Represents a single file's patch information"""
|
|
file_path: str
|
|
operation: FileOperation
|
|
old_path: Optional[str] = None # For renames/copies
|
|
patch_content: Optional[str] = None
|
|
is_binary: bool = False
|
|
similarity: Optional[int] = None # For renames (percentage)
|
|
|
|
|
|
class GitError(Exception):
|
|
"""Custom exception for git operations"""
|
|
pass
|
|
|
|
|
|
def run_git_command(cmd: List[str], cwd: Path,
|
|
capture: bool = True, check: bool = False,
|
|
timeout: Optional[int] = None,
|
|
binary_output: bool = False) -> subprocess.CompletedProcess:
|
|
"""Run a git command and return the result
|
|
|
|
Args:
|
|
cmd: Command to run
|
|
cwd: Working directory
|
|
capture: Whether to capture output
|
|
check: Whether to raise on non-zero return
|
|
timeout: Command timeout in seconds
|
|
binary_output: If True, handle binary output (don't decode as text)
|
|
|
|
Returns:
|
|
CompletedProcess result
|
|
|
|
Raises:
|
|
GitError: If command fails and check=True
|
|
"""
|
|
try:
|
|
# For commands that might output binary data (like git diff with binary files),
|
|
# we need to handle them specially
|
|
if binary_output or ('diff' in cmd and '--binary' not in cmd):
|
|
# First try with text mode
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
cwd=cwd,
|
|
capture_output=capture,
|
|
text=True,
|
|
check=False,
|
|
timeout=timeout or 60,
|
|
errors='replace' # Replace invalid UTF-8 sequences
|
|
)
|
|
except UnicodeDecodeError:
|
|
# Fall back to binary mode
|
|
result = subprocess.run(
|
|
cmd,
|
|
cwd=cwd,
|
|
capture_output=capture,
|
|
text=False,
|
|
check=False,
|
|
timeout=timeout or 60
|
|
)
|
|
# Convert to text with error handling
|
|
if result.stdout:
|
|
result.stdout = result.stdout.decode('utf-8', errors='replace')
|
|
if result.stderr:
|
|
result.stderr = result.stderr.decode('utf-8', errors='replace')
|
|
else:
|
|
result = subprocess.run(
|
|
cmd,
|
|
cwd=cwd,
|
|
capture_output=capture,
|
|
text=True,
|
|
check=False,
|
|
timeout=timeout or 60
|
|
)
|
|
|
|
if check and result.returncode != 0:
|
|
error_msg = result.stderr or result.stdout or "Unknown error"
|
|
raise GitError(f"Git command failed: {' '.join(cmd)}\nError: {error_msg}")
|
|
|
|
return result
|
|
except subprocess.TimeoutExpired:
|
|
log_error(f"Git command timed out after {timeout} seconds: {' '.join(cmd)}")
|
|
raise GitError(f"Command timed out: {' '.join(cmd)}")
|
|
except Exception as e:
|
|
log_error(f"Failed to run git command: {' '.join(cmd)}")
|
|
raise GitError(f"Command failed: {e}")
|
|
|
|
|
|
def validate_git_repository(path: Path) -> bool:
|
|
"""Validate that a path is a git repository"""
|
|
try:
|
|
result = run_git_command(
|
|
['git', 'rev-parse', '--git-dir'],
|
|
cwd=path,
|
|
check=False
|
|
)
|
|
return result.returncode == 0
|
|
except GitError:
|
|
return False
|
|
|
|
|
|
def validate_commit_exists(commit_hash: str, chromium_src: Path) -> bool:
|
|
"""Validate that a commit exists in the repository"""
|
|
try:
|
|
result = run_git_command(
|
|
['git', 'rev-parse', '--verify', f'{commit_hash}^{{commit}}'],
|
|
cwd=chromium_src
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
log_error(f"Commit '{commit_hash}' not found in repository")
|
|
return False
|
|
return True
|
|
except GitError as e:
|
|
log_error(f"Failed to validate commit: {e}")
|
|
return False
|
|
|
|
|
|
def get_commit_changed_files(commit_hash: str, chromium_src: Path) -> List[str]:
|
|
"""Get list of files changed in a commit"""
|
|
try:
|
|
result = run_git_command(
|
|
['git', 'diff-tree', '--no-commit-id', '--name-only', '-r', commit_hash],
|
|
cwd=chromium_src
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
log_error(f"Failed to get changed files for commit {commit_hash}")
|
|
return []
|
|
|
|
files = [f.strip() for f in result.stdout.strip().split('\n') if f.strip()]
|
|
return files
|
|
except GitError as e:
|
|
log_error(f"Error getting changed files: {e}")
|
|
return []
|
|
|
|
|
|
def parse_diff_output(diff_output: str) -> Dict[str, FilePatch]:
|
|
"""
|
|
Parse git diff output into individual file patches with full metadata.
|
|
|
|
Handles:
|
|
- Regular file modifications
|
|
- New files
|
|
- Deleted files
|
|
- Binary files
|
|
- File renames
|
|
- File copies
|
|
- Mode changes
|
|
|
|
Returns:
|
|
Dict mapping file path to FilePatch objects
|
|
"""
|
|
patches = {}
|
|
current_file = None
|
|
current_patch_lines = []
|
|
current_operation = FileOperation.MODIFY
|
|
is_binary = False
|
|
old_path = None
|
|
similarity = None
|
|
|
|
lines = diff_output.splitlines()
|
|
i = 0
|
|
|
|
while i < len(lines):
|
|
line = lines[i]
|
|
|
|
# Start of a new file diff
|
|
if line.startswith('diff --git'):
|
|
# Save previous patch if exists
|
|
if current_file and current_patch_lines:
|
|
patch_content = '\n'.join(current_patch_lines) if not is_binary else None
|
|
patches[current_file] = FilePatch(
|
|
file_path=current_file,
|
|
operation=current_operation,
|
|
old_path=old_path,
|
|
patch_content=patch_content,
|
|
is_binary=is_binary,
|
|
similarity=similarity
|
|
)
|
|
|
|
# Parse file paths from diff line
|
|
match = re.match(r'diff --git a/(.*) b/(.*)', line)
|
|
if match:
|
|
old_file = match.group(1)
|
|
new_file = match.group(2)
|
|
current_file = new_file
|
|
current_patch_lines = [line]
|
|
current_operation = FileOperation.MODIFY
|
|
is_binary = False
|
|
old_path = None
|
|
similarity = None
|
|
else:
|
|
log_warning(f"Could not parse diff line: {line}")
|
|
current_file = None
|
|
current_patch_lines = []
|
|
|
|
i += 1
|
|
continue
|
|
|
|
# Check for file metadata
|
|
if current_file:
|
|
if line.startswith('deleted file'):
|
|
current_operation = FileOperation.DELETE
|
|
current_patch_lines.append(line)
|
|
elif line.startswith('new file'):
|
|
current_operation = FileOperation.ADD
|
|
current_patch_lines.append(line)
|
|
elif line.startswith('similarity index'):
|
|
# Extract similarity percentage for renames
|
|
match = re.match(r'similarity index (\d+)%', line)
|
|
if match:
|
|
similarity = int(match.group(1))
|
|
current_patch_lines.append(line)
|
|
elif line.startswith('rename from'):
|
|
current_operation = FileOperation.RENAME
|
|
old_path = line[12:].strip() # Remove 'rename from '
|
|
current_patch_lines.append(line)
|
|
elif line.startswith('rename to'):
|
|
# Confirm rename operation
|
|
current_patch_lines.append(line)
|
|
elif line.startswith('copy from'):
|
|
current_operation = FileOperation.COPY
|
|
old_path = line[10:].strip() # Remove 'copy from '
|
|
current_patch_lines.append(line)
|
|
elif line.startswith('copy to'):
|
|
# Confirm copy operation
|
|
current_patch_lines.append(line)
|
|
elif line == 'Binary files differ' or line.startswith('Binary files'):
|
|
is_binary = True
|
|
current_operation = FileOperation.BINARY if current_operation == FileOperation.MODIFY else current_operation
|
|
current_patch_lines.append(line)
|
|
elif line.startswith('index ') or line.startswith('---') or line.startswith('+++'):
|
|
current_patch_lines.append(line)
|
|
elif line.startswith('@@'):
|
|
# Hunk header
|
|
current_patch_lines.append(line)
|
|
elif line.startswith('+') or line.startswith('-') or line.startswith(' '):
|
|
# Actual diff content
|
|
current_patch_lines.append(line)
|
|
elif line.startswith('\\'):
|
|
# Special markers like "\ No newline at end of file"
|
|
current_patch_lines.append(line)
|
|
else:
|
|
# Other content
|
|
current_patch_lines.append(line)
|
|
|
|
i += 1
|
|
|
|
# Save last patch
|
|
if current_file and current_patch_lines:
|
|
patch_content = '\n'.join(current_patch_lines) if not is_binary else None
|
|
patches[current_file] = FilePatch(
|
|
file_path=current_file,
|
|
operation=current_operation,
|
|
old_path=old_path,
|
|
patch_content=patch_content,
|
|
is_binary=is_binary,
|
|
similarity=similarity
|
|
)
|
|
|
|
return patches
|
|
|
|
|
|
def write_patch_file(ctx: BuildContext, file_path: str, patch_content: str) -> bool:
|
|
"""
|
|
Write a patch file to chromium_src directory structure.
|
|
|
|
Args:
|
|
ctx: Build context
|
|
file_path: Path of the file being patched
|
|
patch_content: The patch content to write
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
# Construct output path
|
|
output_path = ctx.get_patch_path_for_file(file_path)
|
|
|
|
# Create directory structure
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
# Ensure patch ends with newline
|
|
if patch_content and not patch_content.endswith('\n'):
|
|
patch_content += '\n'
|
|
|
|
output_path.write_text(patch_content, encoding='utf-8')
|
|
log_success(f" Written: {output_path.relative_to(ctx.root_dir)}")
|
|
return True
|
|
except Exception as e:
|
|
log_error(f" Failed to write {output_path}: {e}")
|
|
return False
|
|
|
|
|
|
def create_deletion_marker(ctx: BuildContext, file_path: str) -> bool:
|
|
"""
|
|
Create a marker file for deleted files.
|
|
|
|
Args:
|
|
ctx: Build context
|
|
file_path: Path of the deleted file
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
marker_path = ctx.get_dev_patches_dir() / file_path
|
|
marker_path = marker_path.with_suffix(marker_path.suffix + '.deleted')
|
|
|
|
marker_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
marker_content = f"File deleted in patch\nOriginal path: {file_path}\n"
|
|
marker_path.write_text(marker_content, encoding='utf-8')
|
|
log_warning(f" Marked deleted: {marker_path.relative_to(ctx.root_dir)}")
|
|
return True
|
|
except Exception as e:
|
|
log_error(f" Failed to create deletion marker: {e}")
|
|
return False
|
|
|
|
|
|
def create_binary_marker(ctx: BuildContext, file_path: str, operation: FileOperation) -> bool:
|
|
"""
|
|
Create a marker file for binary files.
|
|
|
|
Args:
|
|
ctx: Build context
|
|
file_path: Path of the binary file
|
|
operation: The operation type
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
marker_path = ctx.get_dev_patches_dir() / file_path
|
|
marker_path = marker_path.with_suffix(marker_path.suffix + '.binary')
|
|
|
|
marker_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
marker_content = f"Binary file\nOperation: {operation.value}\nOriginal path: {file_path}\n"
|
|
marker_path.write_text(marker_content, encoding='utf-8')
|
|
log_warning(f" Binary file marked: {marker_path.relative_to(ctx.root_dir)}")
|
|
return True
|
|
except Exception as e:
|
|
log_error(f" Failed to create binary marker: {e}")
|
|
return False
|
|
|
|
|
|
def apply_single_patch(patch_path: Path, chromium_src: Path,
|
|
interactive: bool = True) -> Tuple[bool, str]:
|
|
"""
|
|
Apply a single patch file to chromium source with multiple strategies.
|
|
|
|
Tries in order:
|
|
1. Standard git apply
|
|
2. Three-way merge
|
|
3. Patch command fallback
|
|
4. Interactive conflict resolution
|
|
|
|
Returns:
|
|
Tuple of (success, message)
|
|
"""
|
|
if not patch_path.exists():
|
|
return False, f"Patch file not found: {patch_path}"
|
|
|
|
# Check if it's a deletion marker
|
|
if patch_path.suffix == '.deleted':
|
|
# Handle file deletion
|
|
file_path = patch_path.stem
|
|
target_file = chromium_src / file_path
|
|
if target_file.exists():
|
|
try:
|
|
target_file.unlink()
|
|
return True, f"Deleted: {file_path}"
|
|
except Exception as e:
|
|
return False, f"Failed to delete {file_path}: {e}"
|
|
else:
|
|
return True, f"Already deleted: {file_path}"
|
|
|
|
# Check if it's a binary marker
|
|
if patch_path.suffix == '.binary':
|
|
return False, f"Binary file patch not supported: {patch_path.name}"
|
|
|
|
# Try standard apply
|
|
result = run_git_command(
|
|
['git', 'apply', '-p1', str(patch_path)],
|
|
cwd=chromium_src
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
return True, f"Applied: {patch_path.name}"
|
|
|
|
# Try 3-way merge
|
|
result = run_git_command(
|
|
['git', 'apply', '-p1', '--3way', str(patch_path)],
|
|
cwd=chromium_src
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
return True, f"Applied (3-way): {patch_path.name}"
|
|
|
|
# Try with whitespace options
|
|
result = run_git_command(
|
|
['git', 'apply', '-p1', '--whitespace=fix', str(patch_path)],
|
|
cwd=chromium_src
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
return True, f"Applied (whitespace fixed): {patch_path.name}"
|
|
|
|
# Handle conflict
|
|
if interactive:
|
|
return handle_patch_conflict(patch_path, chromium_src, result.stderr)
|
|
else:
|
|
return False, f"Failed: {patch_path.name} - {result.stderr}"
|
|
|
|
|
|
def handle_patch_conflict(patch_path: Path, chromium_src: Path,
|
|
error_msg: str = "") -> Tuple[bool, str]:
|
|
"""Handle patch conflict interactively with detailed options"""
|
|
click.echo(f"\n{click.style('CONFLICT:', fg='red', bold=True)} {patch_path}")
|
|
|
|
if error_msg:
|
|
# Parse error message for more context
|
|
lines = error_msg.strip().split('\n')
|
|
for line in lines[:5]: # Show first 5 lines of error
|
|
click.echo(f" {line}")
|
|
|
|
click.echo("\nOptions:")
|
|
click.echo(" 1) Fix manually and continue")
|
|
click.echo(" 2) Skip this patch")
|
|
click.echo(" 3) Try with reduced context (--unidiff-zero)")
|
|
click.echo(" 4) Show patch content")
|
|
click.echo(" 5) Abort all remaining patches")
|
|
|
|
while True:
|
|
choice = click.prompt("Enter choice (1-5)", type=str)
|
|
|
|
if choice == "1":
|
|
click.prompt("Fix the conflicts manually and press Enter to continue")
|
|
return True, f"Manually fixed: {patch_path.name}"
|
|
elif choice == "2":
|
|
return True, f"Skipped: {patch_path.name}"
|
|
elif choice == "3":
|
|
# Try with reduced context
|
|
result = run_git_command(
|
|
['git', 'apply', '-p1', '--unidiff-zero', str(patch_path)],
|
|
cwd=chromium_src
|
|
)
|
|
if result.returncode == 0:
|
|
return True, f"Applied (reduced context): {patch_path.name}"
|
|
else:
|
|
click.echo("Failed with reduced context too")
|
|
continue
|
|
elif choice == "4":
|
|
# Show patch content
|
|
try:
|
|
content = patch_path.read_text()
|
|
lines = content.split('\n')
|
|
# Show first 50 lines
|
|
click.echo("\n--- Patch Content (first 50 lines) ---")
|
|
for line in lines[:50]:
|
|
click.echo(line)
|
|
if len(lines) > 50:
|
|
click.echo(f"... and {len(lines) - 50} more lines")
|
|
click.echo("--- End of Preview ---\n")
|
|
except Exception as e:
|
|
click.echo(f"Failed to read patch: {e}")
|
|
continue
|
|
elif choice == "5":
|
|
return False, "Aborted by user"
|
|
else:
|
|
click.echo("Invalid choice. Please enter 1-5.")
|
|
|
|
|
|
def create_git_commit(chromium_src: Path, message: str) -> bool:
|
|
"""Create a git commit with the given message"""
|
|
# Check if there are changes to commit
|
|
result = run_git_command(
|
|
['git', 'status', '--porcelain'],
|
|
cwd=chromium_src
|
|
)
|
|
|
|
if not result.stdout.strip():
|
|
log_warning("Nothing to commit, working tree clean")
|
|
return True
|
|
|
|
# Stage all changes
|
|
result = run_git_command(
|
|
['git', 'add', '-A'],
|
|
cwd=chromium_src
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
log_error("Failed to stage changes")
|
|
return False
|
|
|
|
# Create commit
|
|
result = run_git_command(
|
|
['git', 'commit', '-m', message],
|
|
cwd=chromium_src
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
if "nothing to commit" in result.stdout:
|
|
log_warning("Nothing to commit")
|
|
else:
|
|
log_error(f"Failed to create commit: {result.stderr}")
|
|
return False
|
|
|
|
log_success(f"Created commit: {message}")
|
|
return True
|
|
|
|
|
|
def get_commit_info(commit_hash: str, chromium_src: Path) -> Optional[Dict[str, str]]:
|
|
"""Get detailed information about a commit"""
|
|
try:
|
|
# Get commit info in a structured format
|
|
result = run_git_command(
|
|
['git', 'show', '--format=%H%n%an%n%ae%n%at%n%s%n%b', '--no-patch', commit_hash],
|
|
cwd=chromium_src
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
return None
|
|
|
|
lines = result.stdout.strip().split('\n')
|
|
if len(lines) >= 5:
|
|
return {
|
|
'hash': lines[0],
|
|
'author_name': lines[1],
|
|
'author_email': lines[2],
|
|
'timestamp': lines[3],
|
|
'subject': lines[4],
|
|
'body': '\n'.join(lines[5:]) if len(lines) > 5 else ''
|
|
}
|
|
return None
|
|
except GitError:
|
|
return None
|
|
|
|
|
|
def prompt_yes_no(question: str, default: bool = False) -> bool:
|
|
"""Prompt user for yes/no question"""
|
|
default_str = "Y/n" if default else "y/N"
|
|
result = click.prompt(f"{question} [{default_str}]",
|
|
type=str, default="y" if default else "n")
|
|
return result.lower() in ('y', 'yes')
|
|
|
|
|
|
def log_extraction_summary(file_patches: Dict[str, FilePatch]):
|
|
"""Log a detailed summary of extracted patches"""
|
|
total = len(file_patches)
|
|
|
|
# Count by operation type
|
|
operations = {op: 0 for op in FileOperation}
|
|
binary_count = 0
|
|
|
|
for patch in file_patches.values():
|
|
operations[patch.operation] += 1
|
|
if patch.is_binary:
|
|
binary_count += 1
|
|
|
|
click.echo("\n" + click.style("Extraction Summary", fg='green', bold=True))
|
|
click.echo("=" * 60)
|
|
click.echo(f"Total files: {total}")
|
|
click.echo("-" * 40)
|
|
|
|
if operations[FileOperation.ADD] > 0:
|
|
click.echo(f"New files: {operations[FileOperation.ADD]}")
|
|
if operations[FileOperation.MODIFY] > 0:
|
|
click.echo(f"Modified: {operations[FileOperation.MODIFY]}")
|
|
if operations[FileOperation.DELETE] > 0:
|
|
click.echo(f"Deleted: {operations[FileOperation.DELETE]}")
|
|
if operations[FileOperation.RENAME] > 0:
|
|
click.echo(f"Renamed: {operations[FileOperation.RENAME]}")
|
|
if operations[FileOperation.COPY] > 0:
|
|
click.echo(f"Copied: {operations[FileOperation.COPY]}")
|
|
if binary_count > 0:
|
|
click.echo(f"Binary files: {binary_count}")
|
|
|
|
click.echo("=" * 60)
|
|
|
|
|
|
def log_apply_summary(results: List[Tuple[str, bool, str]]):
|
|
"""Log a detailed summary of applied patches"""
|
|
total = len(results)
|
|
successful = sum(1 for _, success, _ in results if success)
|
|
failed = total - successful
|
|
|
|
click.echo("\n" + click.style("Apply Summary",
|
|
fg='green' if failed == 0 else 'yellow',
|
|
bold=True))
|
|
click.echo("=" * 60)
|
|
click.echo(f"Total patches: {total}")
|
|
click.echo(f"Successful: {successful}")
|
|
click.echo(f"Failed: {failed}")
|
|
click.echo("=" * 60)
|
|
|
|
if failed > 0:
|
|
click.echo("\n" + click.style("Failed patches:", fg='red', bold=True))
|
|
for file_path, success, message in results:
|
|
if not success:
|
|
click.echo(f" ✗ {file_path}: {message}") |