feat: Add edit_file tool + file-change diffs (#1239)

* Add edit_file tool + file-change diffs

edit_file is an exact old_string -> new_string replacement on a file on disk
(fails if old_string is missing or non-unique unless replace_all); write_file
also returns a unified diff. Diffs render collapsed in the tool bubble
(filename + +adds/-dels, theme colors); the raw JSON command box is hidden.

Security: edit_file is a sensitive filesystem-write tool, treated everywhere
write_file is —
  - added to NON_ADMIN_BLOCKED_TOOLS (is_public_blocked_tool / blocked_tools_for_owner),
    so on auth-enabled deployments a non-admin cannot run it; execute_tool_block
    refuses it for non-admin owners.
  - confined by the same path policy as read_file/write_file (allowlist +
    sensitive-file deny) via _resolve_tool_path.

Disambiguation in tool descriptions + bash prompt: edit_file/write_file are the
only way to write files (they show a diff) — never edit_document (editor panel)
or a bash heredoc/redirect.

Tests (tests/test_edit_file.py): non-admin block (policy + execution gate),
successful edit, not-found old_string, non-unique old_string (+ replace_all),
and path outside the allowed roots.

Files: src/tool_execution.py, src/agent_loop.py, src/tool_schemas.py,
src/agent_tools.py, src/tool_index.py, static/js/chat.js, static/style.css,
tests/test_edit_file.py.

* Drop redundant import os in write_file closure

os is already imported at module top.
This commit is contained in:
Kenny Van de Maele
2026-06-04 18:29:10 +02:00
committed by GitHub
parent 147d1fbde6
commit 7443c36bd9
11 changed files with 351 additions and 12 deletions

View File

@@ -20,6 +20,108 @@ from src.tool_security import is_public_blocked_tool, owner_is_admin_or_single_u
MAX_OUTPUT_CHARS = 10_000
MAX_READ_CHARS = 20_000
MAX_DIFF_LINES = 400 # cap unified-diff size returned to the UI
def _unified_diff(old: str, new: str, path: str) -> Optional[Dict[str, Any]]:
"""Build a unified diff of a file write for display in the chat.
Returns {"text": <unified diff>, "added": N, "removed": M, "new_file": bool}
or None when there's no textual change. Truncates very large diffs.
"""
if old == new:
return None
import difflib
old_lines = old.splitlines()
new_lines = new.splitlines()
label = path or "file"
diff_lines = list(difflib.unified_diff(
old_lines, new_lines,
fromfile=f"a/{label}", tofile=f"b/{label}",
lineterm="",
))
added = sum(1 for l in diff_lines if l.startswith("+") and not l.startswith("+++"))
removed = sum(1 for l in diff_lines if l.startswith("-") and not l.startswith("---"))
truncated = False
if len(diff_lines) > MAX_DIFF_LINES:
diff_lines = diff_lines[:MAX_DIFF_LINES]
truncated = True
text = "\n".join(diff_lines)
if truncated:
text += f"\n… diff truncated at {MAX_DIFF_LINES} lines"
return {
"text": text,
"added": added,
"removed": removed,
"new_file": old == "",
"file": os.path.basename(path) or (path or "file"),
}
async def _do_edit_file(content: str) -> Dict[str, Any]:
"""Exact string-replacement edit of an on-disk file.
content is JSON: {"path", "old_string", "new_string", "replace_all"?}.
Fails if old_string is missing or non-unique (unless replace_all) so the
model can't silently edit the wrong place. Returns a unified diff for the UI.
"""
try:
args = json.loads(content) if content.strip().startswith("{") else {}
except (json.JSONDecodeError, TypeError):
args = {}
raw_path = (args.get("path") or "").strip()
old = args.get("old_string", "")
new = args.get("new_string", "")
replace_all = bool(args.get("replace_all", False))
if not raw_path:
return {"error": "edit_file: path required", "exit_code": 1}
# Confine to the same allowlist + sensitive-file policy as read/write_file.
try:
path = _resolve_tool_path(raw_path)
except ValueError as e:
return {"error": f"edit_file: {e}", "exit_code": 1}
if old == "":
return {"error": "edit_file: old_string required (use write_file to create a file)", "exit_code": 1}
if old == new:
return {"error": "edit_file: old_string and new_string are identical", "exit_code": 1}
def _apply():
with open(path, "r", encoding="utf-8") as f:
original = f.read()
count = original.count(old)
if count == 0:
return original, None, "not_found"
if count > 1 and not replace_all:
return original, None, f"not_unique:{count}"
updated = original.replace(old, new) if replace_all else original.replace(old, new, 1)
with open(path, "w", encoding="utf-8") as f:
f.write(updated)
return original, updated, "ok"
try:
original, updated, status = await asyncio.to_thread(_apply)
except FileNotFoundError:
return {"error": f"edit_file: {path}: not found (use write_file to create it)", "exit_code": 1}
except (IsADirectoryError, UnicodeDecodeError):
return {"error": f"edit_file: {path}: not an editable text file", "exit_code": 1}
except PermissionError:
return {"error": f"edit_file: {path}: permission denied", "exit_code": 1}
except OSError as e:
return {"error": f"edit_file: {path}: {e}", "exit_code": 1}
if status == "not_found":
return {"error": f"edit_file: old_string not found in {path}. Read the file and match it exactly.", "exit_code": 1}
if status.startswith("not_unique"):
n = status.split(":", 1)[1]
return {"error": f"edit_file: old_string is not unique in {path} ({n} matches). Add surrounding context or set replace_all=true.", "exit_code": 1}
n = original.count(old)
result = {"output": f"Edited {path} ({n} replacement{'s' if n != 1 else ''})", "exit_code": 0}
diff = _unified_diff(original, updated, path)
if diff:
result["diff"] = diff
return result
# ---------------------------------------------------------------------------
# Path confinement for read_file / write_file
@@ -544,18 +646,30 @@ async def _direct_fallback(
return {"error": f"write_file: {e}", "exit_code": 1}
try:
def _write():
# Capture prior content (best-effort, text) so we can show a
# before/after diff. Missing/binary file → treat as empty.
old = ""
try:
with open(path, "r", encoding="utf-8") as f:
old = f.read()
except (FileNotFoundError, IsADirectoryError, UnicodeDecodeError, OSError):
old = ""
d = os.path.dirname(path)
if d:
os.makedirs(d, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(body)
return len(body)
size = await asyncio.to_thread(_write)
return old, len(body)
old_content, size = await asyncio.to_thread(_write)
except PermissionError:
return {"error": f"write_file: {path}: permission denied", "exit_code": 1}
except OSError as e:
return {"error": f"write_file: {path}: {e}", "exit_code": 1}
return {"output": f"Wrote {size} bytes to {path}", "exit_code": 0}
diff = _unified_diff(old_content, body, path)
result = {"output": f"Wrote {size} bytes to {path}", "exit_code": 0}
if diff:
result["diff"] = diff
return result
if tool == "web_search":
from src.search import comprehensive_web_search
@@ -894,6 +1008,9 @@ async def execute_tool_block(
elif tool == "edit_image":
desc = "edit_image"
result = await do_edit_image(content, owner=owner)
elif tool == "edit_file":
result = await _do_edit_file(content)
desc = result.get("output") or result.get("error") or "edit_file"
elif tool == "trigger_research":
desc = "trigger_research"
result = await do_trigger_research(content, owner=owner)