mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-21 03:39:54 +00:00
5e743559e0
* fix(lint): skip per-file shell linter when LSP will handle the file `_check_lint` ran `npx tsc --noEmit FILE.ts` after every `.ts`/`.tsx` edit. `tsc` ignores `tsconfig.json` when given an explicit file argument (documented quirk) and defaults to no-lib / ES5, so every ES2015+ stdlib reference reports as missing: - `Cannot find global value 'Promise'` - `Cannot find name 'Map' / 'Set' / 'ReadonlySet' / 'Iterable'` - `Property 'isFinite' does not exist on type 'NumberConstructor'` - `Module 'phaser' can only be default-imported using esModuleInterop` - `import.meta is only allowed when --module is es2020+` On real TypeScript projects this floods the `lint` field on WriteResult / PatchResult with up to 25K tokens of false positives per edit. The delta filter in `_check_lint_delta` is supposed to mask them, but a tiny edit shifts line numbers and every phantom resurfaces as "introduced by this edit". The result is a 1MB+ phantom-error dump on every patch that eats the agent's context budget. Same shape for `.go` (`go vet` outside a module) and `.rs` (`rustfmt --check` outside a Cargo project). PR #24168 added an LSP tier on top of this — real `tsserver` / `gopls` / `rust-analyzer` diagnostics surface in the separate `lsp_diagnostics` field. But the broken shell linter kept running underneath, so the phantom-error dump kept happening even when LSP was giving us a clean authoritative signal. This change short-circuits the shell linter for the structurally-broken extensions (`.ts`, `.tsx`, `.go`, `.rs`) when an LSP server is active and claims the file via `LSPService.enabled_for(path)`. The LSP tier runs as before and carries the real diagnostics in `lsp_diagnostics`. Other shell linters (`py_compile`, `node --check`) keep running unconditionally — they're fast, file-local, and correct. Default behavior (LSP disabled, LSP misconfigured, remote backend, file outside a workspace) is unchanged — the existing fallback paths trigger when `_lsp_will_handle` returns False, so users who haven't opted into LSP get the same shell-linter behavior they had before. Drive-by: `.tsx` was missing from the `LINTERS` table entirely, so TS React files got no post-edit syntax check at all. Added it for symmetry; in practice it now hits the LSP-skip path. Tests: - `tests/agent/lsp/test_shell_linter_lsp_skip.py` — 14 tests covering: * skip happens for each redundant extension when LSP claims the file (asserted by patching `_exec` to raise on any shell-linter call) * shell linter still runs when LSP is inactive (regression guard) * `.py` / `.js` continue to run unconditionally even with LSP active * `_lsp_will_handle` is exception-safe: returns False on None service, remote backend, or `enabled_for` raising * `.tsx` is in both `LINTERS` and `_SHELL_LINTER_LSP_REDUNDANT` - All pre-existing tests in `tests/agent/lsp/` and `tests/tools/test_file_operations*.py` still pass (233/233). * fix(lint): address Copilot review on #29054 Two fixes from copilot-pull-request-reviewer on PR #29054: 1. `.tsx` regression with LSP disabled (https://github.com/NousResearch/hermes-agent/pull/29054#discussion_r3271017282) The first revision added `.tsx` to the `LINTERS` table so that TypeScript React files would hit the LSP skip path. Side effect: when LSP is *disabled* (the default), `.tsx` edits would suddenly run `npx tsc --noEmit FILE.tsx` and inherit the same phantom-error dump this PR is supposed to fix. Pre-PR behavior was implicit `skipped` (no `LINTERS` entry); restore that. - Remove `.tsx` from `LINTERS`. - Remove `.tsx` from `_SHELL_LINTER_LSP_REDUNDANT` (the skip path is unreachable without a `LINTERS` entry — falls through to `ext not in LINTERS` first). - When LSP IS enabled, `.tsx` is still covered by the LSP tier via `_maybe_lsp_diagnostics` (typescript-language-server's `extensions` tuple includes `.tsx`), so the diagnostics still surface — just on the `lsp_diagnostics` channel, not `lint`. - Update test_shell_linter_lsp_skip.py to reflect this contract (drop `.tsx` from the parametrize lists; add `test_tsx_stays_out_of_linters_table_for_default_compatibility` and `test_tsx_default_check_lint_returns_skipped`). 2. V4A patches dropped `WriteResult.lsp_diagnostics` (https://github.com/NousResearch/hermes-agent/pull/29054#discussion_r3271017295) `tools/patch_parser.py::apply_v4a_operations` calls `file_ops.write_file()` per operation, then calls `_check_lint()` directly afterwards — but never propagates `WriteResult.lsp_diagnostics` to the `PatchResult`. The shell-linter skip introduced in this PR makes the gap visible: a `.ts` / `.go` / `.rs` V4A patch with LSP active would return `lint = {f: {skipped: True}}` and zero diagnostics from any channel. - `_apply_add` and `_apply_update` now return `Tuple[bool, str, Optional[str]]` where the third element is `WriteResult.lsp_diagnostics` (or `None` on failure / no diags). - `_apply_delete` and `_apply_move` stay 2-tuples — they don't produce diagnostics, no write goes through `write_file`. - `apply_v4a_operations` accumulates per-file diagnostics blocks and surfaces a combined block on `PatchResult.lsp_diagnostics`. Each block already carries its `<diagnostics file="...">` header from `LSPService.report_for_file`, so concatenation preserves per-file attribution. Tests added (`test_patch_parser.py::TestV4ALspDiagnosticsPropagation`): - ADD op: `WriteResult.lsp_diagnostics` flows to `PatchResult` - UPDATE op: same - No diagnostics → `PatchResult.lsp_diagnostics is None` (not "") - Multi-file patch: combined block contains every per-file block Verification: - Targeted test scope: 257/257 pass (tests/agent/lsp/, tests/tools/test_file_operations*.py, tests/tools/test_patch_parser.py) - Wider sweep: 5400 pass; 11 failures all pre-existing on origin/main (file_staleness / file_read_guards / file_state_registry — unrelated macOS /var/folders tmp-path sensitivity issues, confirmed by re-running on a clean origin/main checkout) * docs(test): align shell-linter LSP skip docstring with .tsx behavior Copilot review feedback (review #4324947616, comment #3271049036): the test module docstring still listed .tsx alongside .ts/.go/.rs in the skip contract, but .tsx is now intentionally NOT in LINTERS or _SHELL_LINTER_LSP_REDUNDANT. Updated the bullet list to drop .tsx from the skip contract and added a paragraph documenting why .tsx is left out (preserves pre-PR implicit-skip behavior for LSP-disabled users; LSP coverage still happens via _maybe_lsp_diagnostics). * test(lsp): drop unused tmp_path from _make_fops helper Copilot review #3271069484: the helper accepted tmp_path but never used it. Callers still need tmp_path themselves for the file they're asserting against, so we just drop the helper's parameter.
623 lines
23 KiB
Python
623 lines
23 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
V4A Patch Format Parser
|
|
|
|
Parses the V4A patch format used by codex, cline, and other coding agents.
|
|
|
|
V4A Format:
|
|
*** Begin Patch
|
|
*** Update File: path/to/file.py
|
|
@@ optional context hint @@
|
|
context line (space prefix)
|
|
-removed line (minus prefix)
|
|
+added line (plus prefix)
|
|
*** Add File: path/to/new.py
|
|
+new file content
|
|
+line 2
|
|
*** Delete File: path/to/old.py
|
|
*** Move File: old/path.py -> new/path.py
|
|
*** End Patch
|
|
|
|
Usage:
|
|
from tools.patch_parser import parse_v4a_patch, apply_v4a_operations
|
|
|
|
operations, error = parse_v4a_patch(patch_content)
|
|
if error:
|
|
print(f"Parse error: {error}")
|
|
else:
|
|
result = apply_v4a_operations(operations, file_ops)
|
|
"""
|
|
|
|
import difflib
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from typing import List, Optional, Tuple, Any
|
|
from enum import Enum
|
|
|
|
|
|
class OperationType(Enum):
|
|
ADD = "add"
|
|
UPDATE = "update"
|
|
DELETE = "delete"
|
|
MOVE = "move"
|
|
|
|
|
|
@dataclass
|
|
class HunkLine:
|
|
"""A single line in a patch hunk."""
|
|
prefix: str # ' ', '-', or '+'
|
|
content: str
|
|
|
|
|
|
@dataclass
|
|
class Hunk:
|
|
"""A group of changes within a file."""
|
|
context_hint: Optional[str] = None
|
|
lines: List[HunkLine] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class PatchOperation:
|
|
"""A single operation in a V4A patch."""
|
|
operation: OperationType
|
|
file_path: str
|
|
new_path: Optional[str] = None # For move operations
|
|
hunks: List[Hunk] = field(default_factory=list)
|
|
content: Optional[str] = None # For add file operations
|
|
|
|
|
|
def parse_v4a_patch(patch_content: str) -> Tuple[List[PatchOperation], Optional[str]]:
|
|
"""
|
|
Parse a V4A format patch.
|
|
|
|
Args:
|
|
patch_content: The patch text in V4A format
|
|
|
|
Returns:
|
|
Tuple of (operations, error_message)
|
|
- If successful: (list_of_operations, None)
|
|
- If failed: ([], error_description)
|
|
"""
|
|
lines = patch_content.split('\n')
|
|
operations: List[PatchOperation] = []
|
|
|
|
# Find patch boundaries
|
|
start_idx = None
|
|
end_idx = None
|
|
|
|
for i, line in enumerate(lines):
|
|
if '*** Begin Patch' in line or '***Begin Patch' in line:
|
|
start_idx = i
|
|
elif '*** End Patch' in line or '***End Patch' in line:
|
|
end_idx = i
|
|
break
|
|
|
|
if start_idx is None:
|
|
# Try to parse without explicit begin marker
|
|
start_idx = -1
|
|
|
|
if end_idx is None:
|
|
end_idx = len(lines)
|
|
|
|
# Parse operations between boundaries
|
|
i = start_idx + 1
|
|
current_op: Optional[PatchOperation] = None
|
|
current_hunk: Optional[Hunk] = None
|
|
|
|
while i < end_idx:
|
|
line = lines[i]
|
|
|
|
# Check for file operation markers
|
|
update_match = re.match(r'\*\*\*\s*Update\s+File:\s*(.+)', line)
|
|
add_match = re.match(r'\*\*\*\s*Add\s+File:\s*(.+)', line)
|
|
delete_match = re.match(r'\*\*\*\s*Delete\s+File:\s*(.+)', line)
|
|
move_match = re.match(r'\*\*\*\s*Move\s+File:\s*(.+?)\s*->\s*(.+)', line)
|
|
|
|
if update_match:
|
|
# Save previous operation
|
|
if current_op:
|
|
if current_hunk and current_hunk.lines:
|
|
current_op.hunks.append(current_hunk)
|
|
operations.append(current_op)
|
|
|
|
current_op = PatchOperation(
|
|
operation=OperationType.UPDATE,
|
|
file_path=update_match.group(1).strip()
|
|
)
|
|
current_hunk = None
|
|
|
|
elif add_match:
|
|
if current_op:
|
|
if current_hunk and current_hunk.lines:
|
|
current_op.hunks.append(current_hunk)
|
|
operations.append(current_op)
|
|
|
|
current_op = PatchOperation(
|
|
operation=OperationType.ADD,
|
|
file_path=add_match.group(1).strip()
|
|
)
|
|
current_hunk = Hunk()
|
|
|
|
elif delete_match:
|
|
if current_op:
|
|
if current_hunk and current_hunk.lines:
|
|
current_op.hunks.append(current_hunk)
|
|
operations.append(current_op)
|
|
|
|
current_op = PatchOperation(
|
|
operation=OperationType.DELETE,
|
|
file_path=delete_match.group(1).strip()
|
|
)
|
|
operations.append(current_op)
|
|
current_op = None
|
|
current_hunk = None
|
|
|
|
elif move_match:
|
|
if current_op:
|
|
if current_hunk and current_hunk.lines:
|
|
current_op.hunks.append(current_hunk)
|
|
operations.append(current_op)
|
|
|
|
current_op = PatchOperation(
|
|
operation=OperationType.MOVE,
|
|
file_path=move_match.group(1).strip(),
|
|
new_path=move_match.group(2).strip()
|
|
)
|
|
operations.append(current_op)
|
|
current_op = None
|
|
current_hunk = None
|
|
|
|
elif line.startswith('@@'):
|
|
# Context hint / hunk marker
|
|
if current_op:
|
|
if current_hunk and current_hunk.lines:
|
|
current_op.hunks.append(current_hunk)
|
|
|
|
# Extract context hint
|
|
hint_match = re.match(r'@@\s*(.+?)\s*@@', line)
|
|
hint = hint_match.group(1) if hint_match else None
|
|
current_hunk = Hunk(context_hint=hint)
|
|
|
|
elif current_op and line:
|
|
# Parse hunk line
|
|
if current_hunk is None:
|
|
current_hunk = Hunk()
|
|
|
|
if line.startswith('+'):
|
|
current_hunk.lines.append(HunkLine('+', line[1:]))
|
|
elif line.startswith('-'):
|
|
current_hunk.lines.append(HunkLine('-', line[1:]))
|
|
elif line.startswith(' '):
|
|
current_hunk.lines.append(HunkLine(' ', line[1:]))
|
|
elif line.startswith('\\'):
|
|
# "\ No newline at end of file" marker - skip
|
|
pass
|
|
else:
|
|
# Treat as context line (implicit space prefix)
|
|
current_hunk.lines.append(HunkLine(' ', line))
|
|
|
|
i += 1
|
|
|
|
# Don't forget the last operation
|
|
if current_op:
|
|
if current_hunk and current_hunk.lines:
|
|
current_op.hunks.append(current_hunk)
|
|
operations.append(current_op)
|
|
|
|
# Validate the parsed result
|
|
if not operations:
|
|
# Empty patch is not an error — callers get [] and can decide
|
|
return operations, None
|
|
|
|
parse_errors: List[str] = []
|
|
for op in operations:
|
|
if not op.file_path:
|
|
parse_errors.append("Operation with empty file path")
|
|
if op.operation == OperationType.UPDATE and not op.hunks:
|
|
parse_errors.append(f"UPDATE {op.file_path!r}: no hunks found")
|
|
if op.operation == OperationType.MOVE and not op.new_path:
|
|
parse_errors.append(f"MOVE {op.file_path!r}: missing destination path (expected 'src -> dst')")
|
|
|
|
if parse_errors:
|
|
return [], "Parse error: " + "; ".join(parse_errors)
|
|
|
|
return operations, None
|
|
|
|
|
|
def _count_occurrences(text: str, pattern: str) -> int:
|
|
"""Count non-overlapping occurrences of *pattern* in *text*."""
|
|
count = 0
|
|
start = 0
|
|
while True:
|
|
pos = text.find(pattern, start)
|
|
if pos == -1:
|
|
break
|
|
count += 1
|
|
start = pos + 1
|
|
return count
|
|
|
|
|
|
def _validate_operations(
|
|
operations: List[PatchOperation],
|
|
file_ops: Any,
|
|
) -> List[str]:
|
|
"""Validate all operations without writing any files.
|
|
|
|
Returns a list of error strings; an empty list means all operations
|
|
are valid and the apply phase can proceed safely.
|
|
|
|
For UPDATE operations, hunks are simulated in order so that later
|
|
hunks validate against post-earlier-hunk content (matching apply order).
|
|
"""
|
|
# Deferred import: breaks the patch_parser ↔ fuzzy_match circular dependency
|
|
from tools.fuzzy_match import fuzzy_find_and_replace
|
|
|
|
errors: List[str] = []
|
|
|
|
for op in operations:
|
|
if op.operation == OperationType.UPDATE:
|
|
read_result = file_ops.read_file_raw(op.file_path)
|
|
if read_result.error:
|
|
errors.append(f"{op.file_path}: {read_result.error}")
|
|
continue
|
|
|
|
simulated = read_result.content
|
|
for hunk in op.hunks:
|
|
search_lines = [l.content for l in hunk.lines if l.prefix in {' ', '-'}]
|
|
if not search_lines:
|
|
# Addition-only hunk: validate context hint uniqueness
|
|
if hunk.context_hint:
|
|
occurrences = _count_occurrences(simulated, hunk.context_hint)
|
|
if occurrences == 0:
|
|
errors.append(
|
|
f"{op.file_path}: addition-only hunk context hint "
|
|
f"'{hunk.context_hint}' not found"
|
|
)
|
|
elif occurrences > 1:
|
|
errors.append(
|
|
f"{op.file_path}: addition-only hunk context hint "
|
|
f"'{hunk.context_hint}' is ambiguous "
|
|
f"({occurrences} occurrences)"
|
|
)
|
|
continue
|
|
|
|
search_pattern = '\n'.join(search_lines)
|
|
replace_lines = [l.content for l in hunk.lines if l.prefix in {' ', '+'}]
|
|
replacement = '\n'.join(replace_lines)
|
|
|
|
new_simulated, count, _strategy, match_error = fuzzy_find_and_replace(
|
|
simulated, search_pattern, replacement, replace_all=False
|
|
)
|
|
if count == 0:
|
|
label = f"'{hunk.context_hint}'" if hunk.context_hint else "(no hint)"
|
|
msg = (
|
|
f"{op.file_path}: hunk {label} not found"
|
|
+ (f" — {match_error}" if match_error else "")
|
|
)
|
|
try:
|
|
from tools.fuzzy_match import format_no_match_hint
|
|
msg += format_no_match_hint(match_error, count, search_pattern, simulated)
|
|
except Exception:
|
|
pass
|
|
errors.append(msg)
|
|
else:
|
|
# Advance simulation so subsequent hunks validate correctly.
|
|
# Reuse the result from the call above — no second fuzzy run.
|
|
simulated = new_simulated
|
|
|
|
elif op.operation == OperationType.DELETE:
|
|
read_result = file_ops.read_file_raw(op.file_path)
|
|
if read_result.error:
|
|
errors.append(f"{op.file_path}: file not found for deletion")
|
|
|
|
elif op.operation == OperationType.MOVE:
|
|
if not op.new_path:
|
|
errors.append(f"{op.file_path}: MOVE operation missing destination path")
|
|
continue
|
|
src_result = file_ops.read_file_raw(op.file_path)
|
|
if src_result.error:
|
|
errors.append(f"{op.file_path}: source file not found for move")
|
|
dst_result = file_ops.read_file_raw(op.new_path)
|
|
if not dst_result.error:
|
|
errors.append(
|
|
f"{op.new_path}: destination already exists — move would overwrite"
|
|
)
|
|
|
|
# ADD: parent directory creation handled by write_file; no pre-check needed.
|
|
|
|
return errors
|
|
|
|
|
|
def apply_v4a_operations(operations: List[PatchOperation],
|
|
file_ops: Any) -> 'PatchResult':
|
|
"""Apply V4A patch operations using a file operations interface.
|
|
|
|
Uses a two-phase validate-then-apply approach:
|
|
- Phase 1: validate all operations against current file contents without
|
|
writing anything. If any validation error is found, return immediately
|
|
with no filesystem changes.
|
|
- Phase 2: apply all operations. A failure here (e.g. a race between
|
|
validation and apply) is reported with a note to run ``git diff``.
|
|
|
|
Args:
|
|
operations: List of PatchOperation from parse_v4a_patch
|
|
file_ops: Object with read_file_raw, write_file methods
|
|
|
|
Returns:
|
|
PatchResult with results of all operations
|
|
"""
|
|
# Import here to avoid circular imports
|
|
from tools.file_operations import PatchResult
|
|
|
|
# ---- Phase 1: validate ----
|
|
validation_errors = _validate_operations(operations, file_ops)
|
|
if validation_errors:
|
|
return PatchResult(
|
|
success=False,
|
|
error="Patch validation failed (no files were modified):\n"
|
|
+ "\n".join(f" • {e}" for e in validation_errors),
|
|
)
|
|
|
|
# ---- Phase 2: apply ----
|
|
files_modified = []
|
|
files_created = []
|
|
files_deleted = []
|
|
all_diffs = []
|
|
# Per-file LSP diagnostics blocks captured from underlying write_file
|
|
# calls. V4A bypasses the WriteResult / PatchResult plumbing that
|
|
# write_file and patch_replace use, so without explicit propagation
|
|
# the LSP tier's output gets silently dropped — see
|
|
# ``PatchResult.lsp_diagnostics`` aggregation below.
|
|
lsp_blocks: List[str] = []
|
|
errors = []
|
|
|
|
for op in operations:
|
|
try:
|
|
if op.operation == OperationType.ADD:
|
|
result = _apply_add(op, file_ops)
|
|
if result[0]:
|
|
files_created.append(op.file_path)
|
|
all_diffs.append(result[1])
|
|
if result[2]:
|
|
lsp_blocks.append(result[2])
|
|
else:
|
|
errors.append(f"Failed to add {op.file_path}: {result[1]}")
|
|
|
|
elif op.operation == OperationType.DELETE:
|
|
result = _apply_delete(op, file_ops)
|
|
if result[0]:
|
|
files_deleted.append(op.file_path)
|
|
all_diffs.append(result[1])
|
|
else:
|
|
errors.append(f"Failed to delete {op.file_path}: {result[1]}")
|
|
|
|
elif op.operation == OperationType.MOVE:
|
|
result = _apply_move(op, file_ops)
|
|
if result[0]:
|
|
files_modified.append(f"{op.file_path} -> {op.new_path}")
|
|
all_diffs.append(result[1])
|
|
else:
|
|
errors.append(f"Failed to move {op.file_path}: {result[1]}")
|
|
|
|
elif op.operation == OperationType.UPDATE:
|
|
result = _apply_update(op, file_ops)
|
|
if result[0]:
|
|
files_modified.append(op.file_path)
|
|
all_diffs.append(result[1])
|
|
if result[2]:
|
|
lsp_blocks.append(result[2])
|
|
else:
|
|
errors.append(f"Failed to update {op.file_path}: {result[1]}")
|
|
|
|
except Exception as e:
|
|
errors.append(f"Error processing {op.file_path}: {str(e)}")
|
|
|
|
# Run lint on all modified/created files
|
|
lint_results = {}
|
|
for f in files_modified + files_created:
|
|
if hasattr(file_ops, '_check_lint'):
|
|
lint_result = file_ops._check_lint(f)
|
|
lint_results[f] = lint_result.to_dict()
|
|
|
|
combined_diff = '\n'.join(all_diffs)
|
|
|
|
# Combine per-file LSP diagnostics blocks. Each block already has
|
|
# the ``<diagnostics file="...">`` header from
|
|
# ``LSPService.report_for_file`` so concatenation is safe — the
|
|
# agent (and any downstream parsers) can still attribute each
|
|
# diagnostic to its file.
|
|
combined_lsp = "\n\n".join(lsp_blocks) if lsp_blocks else None
|
|
|
|
if errors:
|
|
return PatchResult(
|
|
success=False,
|
|
diff=combined_diff,
|
|
files_modified=files_modified,
|
|
files_created=files_created,
|
|
files_deleted=files_deleted,
|
|
lint=lint_results if lint_results else None,
|
|
lsp_diagnostics=combined_lsp,
|
|
error="Apply phase failed (state may be inconsistent — run `git diff` to assess):\n"
|
|
+ "\n".join(f" • {e}" for e in errors),
|
|
)
|
|
|
|
return PatchResult(
|
|
success=True,
|
|
diff=combined_diff,
|
|
files_modified=files_modified,
|
|
files_created=files_created,
|
|
files_deleted=files_deleted,
|
|
lint=lint_results if lint_results else None,
|
|
lsp_diagnostics=combined_lsp,
|
|
)
|
|
|
|
|
|
def _apply_add(op: PatchOperation, file_ops: Any) -> Tuple[bool, str, Optional[str]]:
|
|
"""Apply an add file operation.
|
|
|
|
Returns ``(success, diff_or_error, lsp_diagnostics)``. The third
|
|
element carries the formatted ``<diagnostics>`` block from
|
|
:class:`WriteResult.lsp_diagnostics` so V4A patches can surface
|
|
semantic diagnostics from the LSP layer — without this, the LSP
|
|
tier would silently swallow them on the V4A code path.
|
|
"""
|
|
# Extract content from hunks (all + lines)
|
|
content_lines = []
|
|
for hunk in op.hunks:
|
|
for line in hunk.lines:
|
|
if line.prefix == '+':
|
|
content_lines.append(line.content)
|
|
|
|
content = '\n'.join(content_lines)
|
|
|
|
result = file_ops.write_file(op.file_path, content)
|
|
if result.error:
|
|
return False, result.error, None
|
|
|
|
diff = f"--- /dev/null\n+++ b/{op.file_path}\n"
|
|
diff += '\n'.join(f"+{line}" for line in content_lines)
|
|
|
|
return True, diff, getattr(result, "lsp_diagnostics", None)
|
|
|
|
|
|
def _apply_delete(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
|
|
"""Apply a delete file operation."""
|
|
# Read before deleting so we can produce a real unified diff.
|
|
# Validation already confirmed existence; this guards against races.
|
|
read_result = file_ops.read_file_raw(op.file_path)
|
|
if read_result.error:
|
|
return False, f"Cannot delete {op.file_path}: file not found"
|
|
|
|
result = file_ops.delete_file(op.file_path)
|
|
if result.error:
|
|
return False, result.error
|
|
|
|
removed_lines = read_result.content.splitlines(keepends=True)
|
|
diff = ''.join(difflib.unified_diff(
|
|
removed_lines, [],
|
|
fromfile=f"a/{op.file_path}",
|
|
tofile="/dev/null",
|
|
))
|
|
return True, diff or f"# Deleted: {op.file_path}"
|
|
|
|
|
|
def _apply_move(op: PatchOperation, file_ops: Any) -> Tuple[bool, str]:
|
|
"""Apply a move file operation."""
|
|
result = file_ops.move_file(op.file_path, op.new_path)
|
|
if result.error:
|
|
return False, result.error
|
|
|
|
diff = f"# Moved: {op.file_path} -> {op.new_path}"
|
|
return True, diff
|
|
|
|
|
|
def _apply_update(op: PatchOperation, file_ops: Any) -> Tuple[bool, str, Optional[str]]:
|
|
"""Apply an update file operation.
|
|
|
|
Returns ``(success, diff_or_error, lsp_diagnostics)`` — see
|
|
:func:`_apply_add` for the rationale on the third element.
|
|
"""
|
|
# Deferred import: breaks the patch_parser ↔ fuzzy_match circular dependency
|
|
from tools.fuzzy_match import fuzzy_find_and_replace
|
|
|
|
# Read current content — raw so no line-number prefixes or per-line truncation
|
|
read_result = file_ops.read_file_raw(op.file_path)
|
|
|
|
if read_result.error:
|
|
return False, f"Cannot read file: {read_result.error}", None
|
|
|
|
current_content = read_result.content
|
|
|
|
# Apply each hunk
|
|
new_content = current_content
|
|
|
|
for hunk in op.hunks:
|
|
# Build search pattern from context and removed lines
|
|
search_lines = []
|
|
replace_lines = []
|
|
|
|
for line in hunk.lines:
|
|
if line.prefix == ' ':
|
|
search_lines.append(line.content)
|
|
replace_lines.append(line.content)
|
|
elif line.prefix == '-':
|
|
search_lines.append(line.content)
|
|
elif line.prefix == '+':
|
|
replace_lines.append(line.content)
|
|
|
|
if search_lines:
|
|
search_pattern = '\n'.join(search_lines)
|
|
replacement = '\n'.join(replace_lines)
|
|
|
|
new_content, count, _strategy, error = fuzzy_find_and_replace(
|
|
new_content, search_pattern, replacement, replace_all=False
|
|
)
|
|
|
|
if error and count == 0:
|
|
# Try with context hint if available
|
|
if hunk.context_hint:
|
|
# Find the context hint location and search nearby
|
|
hint_pos = new_content.find(hunk.context_hint)
|
|
if hint_pos != -1:
|
|
# Search in a window around the hint
|
|
window_start = max(0, hint_pos - 500)
|
|
window_end = min(len(new_content), hint_pos + 2000)
|
|
window = new_content[window_start:window_end]
|
|
|
|
window_new, count, _strategy, error = fuzzy_find_and_replace(
|
|
window, search_pattern, replacement, replace_all=False
|
|
)
|
|
|
|
if count > 0:
|
|
new_content = new_content[:window_start] + window_new + new_content[window_end:]
|
|
error = None
|
|
|
|
if error:
|
|
err_msg = f"Could not apply hunk: {error}"
|
|
try:
|
|
from tools.fuzzy_match import format_no_match_hint
|
|
err_msg += format_no_match_hint(error, 0, search_pattern, new_content)
|
|
except Exception:
|
|
pass
|
|
return False, err_msg, None
|
|
else:
|
|
# Addition-only hunk (no context or removed lines).
|
|
# Insert at the location indicated by the context hint, or at end of file.
|
|
insert_text = '\n'.join(replace_lines)
|
|
if hunk.context_hint:
|
|
occurrences = _count_occurrences(new_content, hunk.context_hint)
|
|
if occurrences == 0:
|
|
# Hint not found — append at end as a safe fallback
|
|
new_content = new_content.rstrip('\n') + '\n' + insert_text + '\n'
|
|
elif occurrences > 1:
|
|
return False, (
|
|
f"Addition-only hunk: context hint '{hunk.context_hint}' is ambiguous "
|
|
f"({occurrences} occurrences) — provide a more unique hint"
|
|
), None
|
|
else:
|
|
hint_pos = new_content.find(hunk.context_hint)
|
|
# Insert after the line containing the context hint
|
|
eol = new_content.find('\n', hint_pos)
|
|
if eol != -1:
|
|
new_content = new_content[:eol + 1] + insert_text + '\n' + new_content[eol + 1:]
|
|
else:
|
|
new_content = new_content + '\n' + insert_text
|
|
else:
|
|
new_content = new_content.rstrip('\n') + '\n' + insert_text + '\n'
|
|
|
|
# Write new content
|
|
write_result = file_ops.write_file(op.file_path, new_content)
|
|
if write_result.error:
|
|
return False, write_result.error, None
|
|
|
|
# Generate diff
|
|
diff_lines = difflib.unified_diff(
|
|
current_content.splitlines(keepends=True),
|
|
new_content.splitlines(keepends=True),
|
|
fromfile=f"a/{op.file_path}",
|
|
tofile=f"b/{op.file_path}"
|
|
)
|
|
diff = ''.join(diff_lines)
|
|
|
|
return True, diff, getattr(write_result, "lsp_diagnostics", None)
|