feat: add code-navigation tools (grep, glob, ls) + read_file line ranges (#1670)
Gives the agent first-class code navigation instead of shelling out via bash (token-heavy, unreliable on weaker models, unstructured). Mirrors the Grep/Glob/Read primitives that Claude Code / opencode expose. - grep: regex search over file contents across a tree. Uses ripgrep when available (with explicit excludes so junk dirs are skipped even without a .gitignore); falls back to a pure-Python walk+regex when rg is absent. Returns file:line:match, capped. - glob: find files by glob pattern (recursive), newest first. - ls: list a directory (folders first, then files with sizes). - read_file: optional offset/limit for line-range reads of large files (plain-path calls stay back-compatible). All confined by the same path policy as read_file (_resolve_tool_path: data/tmp allowlist + sensitive-file deny). Junk dirs (.git, node_modules, venv, __pycache__, dist/build, …) skipped. Output capped (200 hits, 400 chars/line). Admin-gated like the other filesystem tools. Wiring: schemas + native arg->content serializer (src/tool_schemas.py), tool tags (src/agent_tools.py), always-available + descriptions (src/tool_index.py), admin gate (src/tool_security.py), dispatch + impls (src/tool_execution.py). Tests: tests/test_code_nav_tools.py — match/skip-junk/ignore-case/glob-filter, allowlist rejection, glob/ls, read-range, and the no-ripgrep Python fallback.
This commit is contained in:
committed by
GitHub
parent
7443c36bd9
commit
1f00fff837
@@ -27,6 +27,7 @@ MAX_READ_CHARS = 20_000
|
||||
|
||||
# Tool types that trigger execution
|
||||
TOOL_TAGS = {"bash", "python", "web_search", "web_fetch", "read_file", "write_file", "edit_file",
|
||||
"grep", "glob", "ls",
|
||||
"create_document", "update_document", "edit_document",
|
||||
"search_chats",
|
||||
"chat_with_model", "create_session", "list_sessions",
|
||||
|
||||
@@ -288,6 +288,34 @@ def get_mcp_manager():
|
||||
return agent_tools.get_mcp_manager()
|
||||
|
||||
|
||||
# Directories ignored by the code-nav tools' Python fallbacks so results aren't
|
||||
# polluted by VCS internals / dependency trees / build caches. ripgrep already
|
||||
# honours .gitignore; this is the parity floor for the no-rg path (and the
|
||||
# explicit excludes passed to rg so it skips them even without a .gitignore).
|
||||
_CODENAV_SKIP_DIRS = frozenset({
|
||||
".git", ".hg", ".svn", "node_modules", "venv", ".venv", "__pycache__",
|
||||
".mypy_cache", ".pytest_cache", ".ruff_cache", "dist", "build",
|
||||
".next", ".cache", "site-packages", ".idea", ".tox",
|
||||
})
|
||||
# Per-tool result caps (keep tool output cheap + model-friendly).
|
||||
_CODENAV_MAX_HITS = 200
|
||||
_CODENAV_MAX_LINE = 400
|
||||
|
||||
|
||||
def _resolve_search_root(raw_path: str) -> str:
|
||||
"""Resolve + confine a code-nav path (grep/glob/ls).
|
||||
|
||||
Empty path → the agent's primary root (first allowlisted root, i.e. the
|
||||
project data dir). A supplied path is confined by the same allowlist +
|
||||
sensitive-file policy as read_file (_resolve_tool_path).
|
||||
"""
|
||||
raw = (raw_path or "").strip()
|
||||
if not raw:
|
||||
roots = _tool_path_roots()
|
||||
return roots[0] if roots else os.path.realpath(".")
|
||||
return _resolve_tool_path(raw)
|
||||
|
||||
|
||||
def _truncate(text: str, limit: int = MAX_OUTPUT_CHARS) -> str:
|
||||
if len(text) > limit:
|
||||
return text[:limit] + f"\n... (truncated, {len(text)} chars total)"
|
||||
@@ -614,14 +642,42 @@ async def _direct_fallback(
|
||||
return {"output": output or "(no output)", "exit_code": rc or 0}
|
||||
|
||||
if tool == "read_file":
|
||||
raw_path = content.split("\n", 1)[0].strip()
|
||||
# Args: plain path on line 1 (back-compat) OR JSON
|
||||
# {path, offset?, limit?} where offset/limit are a 1-based line range.
|
||||
raw_path, offset, limit = content.split("\n", 1)[0].strip(), 0, 0
|
||||
_stripped = content.strip()
|
||||
if _stripped.startswith("{"):
|
||||
try:
|
||||
_a = _json.loads(_stripped)
|
||||
raw_path = str(_a.get("path", "")).strip()
|
||||
offset = int(_a.get("offset") or 0)
|
||||
limit = int(_a.get("limit") or 0)
|
||||
except (_json.JSONDecodeError, TypeError, ValueError):
|
||||
pass
|
||||
try:
|
||||
path = _resolve_tool_path(raw_path)
|
||||
except ValueError as e:
|
||||
return {"error": f"read_file: {e}", "exit_code": 1}
|
||||
try:
|
||||
# Run blocking read in a thread to keep the loop responsive
|
||||
# Run blocking read in a thread to keep the loop responsive.
|
||||
def _read():
|
||||
if offset > 0 or limit > 0:
|
||||
# Line-range read: slice [offset, offset+limit).
|
||||
start = max(offset, 1)
|
||||
out, n, budget = [], 0, MAX_READ_CHARS
|
||||
with open(path, "r", encoding="utf-8", errors="replace") as f:
|
||||
for i, line in enumerate(f, 1):
|
||||
if i < start:
|
||||
continue
|
||||
if limit > 0 and n >= limit:
|
||||
break
|
||||
out.append(line)
|
||||
n += 1
|
||||
budget -= len(line)
|
||||
if budget <= 0:
|
||||
out.append(f"\n... [truncated at {MAX_READ_CHARS} chars]")
|
||||
break
|
||||
return "".join(out)
|
||||
with open(path, "r", encoding="utf-8", errors="replace") as f:
|
||||
return f.read(MAX_READ_CHARS + 1)
|
||||
data = await asyncio.to_thread(_read)
|
||||
@@ -629,10 +685,11 @@ async def _direct_fallback(
|
||||
return {"error": f"read_file: {path}: not found", "exit_code": 1}
|
||||
except PermissionError:
|
||||
return {"error": f"read_file: {path}: permission denied", "exit_code": 1}
|
||||
except IsADirectoryError:
|
||||
return {"error": f"read_file: {path}: is a directory (use ls)", "exit_code": 1}
|
||||
except OSError as e:
|
||||
return {"error": f"read_file: {path}: {e}", "exit_code": 1}
|
||||
truncated = len(data) > MAX_READ_CHARS
|
||||
if truncated:
|
||||
if not (offset > 0 or limit > 0) and len(data) > MAX_READ_CHARS:
|
||||
data = data[:MAX_READ_CHARS] + f"\n... [truncated at {MAX_READ_CHARS} chars]"
|
||||
return {"output": data, "exit_code": 0}
|
||||
|
||||
@@ -671,6 +728,196 @@ async def _direct_fallback(
|
||||
result["diff"] = diff
|
||||
return result
|
||||
|
||||
if tool == "grep":
|
||||
# Args (JSON): {pattern, path?, glob?, ignore_case?, max_results?}.
|
||||
# Bare string → treated as the pattern.
|
||||
args: Dict[str, Any] = {}
|
||||
_s = (content or "").strip()
|
||||
if _s.startswith("{"):
|
||||
try:
|
||||
args = _json.loads(_s)
|
||||
except _json.JSONDecodeError:
|
||||
args = {}
|
||||
else:
|
||||
args = {"pattern": _s}
|
||||
pattern = str(args.get("pattern", "")).strip()
|
||||
if not pattern:
|
||||
return {"error": "grep: pattern is required", "exit_code": 1}
|
||||
ignore_case = bool(args.get("ignore_case"))
|
||||
glob_pat = str(args.get("glob", "") or "").strip()
|
||||
try:
|
||||
max_hits = int(args.get("max_results") or _CODENAV_MAX_HITS)
|
||||
except (TypeError, ValueError):
|
||||
max_hits = _CODENAV_MAX_HITS
|
||||
max_hits = max(1, min(max_hits, _CODENAV_MAX_HITS))
|
||||
try:
|
||||
root = _resolve_search_root(str(args.get("path", "")))
|
||||
except ValueError as e:
|
||||
return {"error": f"grep: {e}", "exit_code": 1}
|
||||
|
||||
def _grep():
|
||||
import re as _re
|
||||
import shutil
|
||||
rg = shutil.which("rg")
|
||||
if rg:
|
||||
cmd = [rg, "--line-number", "--no-heading", "--color=never",
|
||||
"--max-count", str(max_hits)]
|
||||
if ignore_case:
|
||||
cmd.append("--ignore-case")
|
||||
if glob_pat:
|
||||
cmd += ["--glob", glob_pat]
|
||||
# Exclude junk dirs even when the tree has no .gitignore, so
|
||||
# results match the Python fallback's skip set.
|
||||
for _d in _CODENAV_SKIP_DIRS:
|
||||
cmd += ["--glob", f"!**/{_d}/**"]
|
||||
cmd += ["--regexp", pattern, root]
|
||||
try:
|
||||
import subprocess
|
||||
p = subprocess.run(cmd, capture_output=True, text=True, timeout=20)
|
||||
lines = [ln for ln in (p.stdout or "").splitlines() if ln][:max_hits]
|
||||
return lines, None
|
||||
except subprocess.TimeoutExpired:
|
||||
return None, "grep: timed out"
|
||||
except Exception as _e:
|
||||
return None, f"grep: {_e}"
|
||||
# Python fallback (no ripgrep): walk + regex.
|
||||
try:
|
||||
rx = _re.compile(pattern, _re.IGNORECASE if ignore_case else 0)
|
||||
except _re.error as _e:
|
||||
return None, f"grep: bad pattern: {_e}"
|
||||
import fnmatch
|
||||
hits = []
|
||||
if os.path.isfile(root):
|
||||
file_iter = [root]
|
||||
else:
|
||||
file_iter = []
|
||||
for dp, dns, fns in os.walk(root):
|
||||
dns[:] = [d for d in dns if d not in _CODENAV_SKIP_DIRS]
|
||||
for fn in fns:
|
||||
if glob_pat and not fnmatch.fnmatch(fn, glob_pat):
|
||||
continue
|
||||
file_iter.append(os.path.join(dp, fn))
|
||||
for fp in file_iter:
|
||||
if len(hits) >= max_hits:
|
||||
break
|
||||
try:
|
||||
with open(fp, "r", encoding="utf-8", errors="strict") as f:
|
||||
for i, line in enumerate(f, 1):
|
||||
if rx.search(line):
|
||||
hits.append(f"{fp}:{i}:{line.rstrip()[:_CODENAV_MAX_LINE]}")
|
||||
if len(hits) >= max_hits:
|
||||
break
|
||||
except (UnicodeDecodeError, OSError):
|
||||
continue # skip binary / unreadable
|
||||
return hits, None
|
||||
|
||||
lines, err = await asyncio.to_thread(_grep)
|
||||
if err:
|
||||
return {"error": err, "exit_code": 1}
|
||||
if not lines:
|
||||
return {"output": f"No matches for {pattern!r} under {root}", "exit_code": 0}
|
||||
out = "\n".join(ln[:_CODENAV_MAX_LINE] for ln in lines)
|
||||
if len(lines) >= max_hits:
|
||||
out += f"\n... [capped at {max_hits} matches]"
|
||||
return {"output": _truncate(out), "exit_code": 0}
|
||||
|
||||
if tool == "glob":
|
||||
args = {}
|
||||
_s = (content or "").strip()
|
||||
if _s.startswith("{"):
|
||||
try:
|
||||
args = _json.loads(_s)
|
||||
except _json.JSONDecodeError:
|
||||
args = {}
|
||||
else:
|
||||
args = {"pattern": _s}
|
||||
pattern = str(args.get("pattern", "")).strip()
|
||||
if not pattern:
|
||||
return {"error": "glob: pattern is required", "exit_code": 1}
|
||||
try:
|
||||
root = _resolve_search_root(str(args.get("path", "")))
|
||||
except ValueError as e:
|
||||
return {"error": f"glob: {e}", "exit_code": 1}
|
||||
|
||||
def _glob():
|
||||
from pathlib import Path
|
||||
base = Path(root)
|
||||
if not base.is_dir():
|
||||
return None, f"glob: {root}: not a directory"
|
||||
matched = []
|
||||
try:
|
||||
for p in base.rglob(pattern):
|
||||
if set(p.relative_to(base).parts) & _CODENAV_SKIP_DIRS:
|
||||
continue
|
||||
try:
|
||||
mtime = p.stat().st_mtime
|
||||
except OSError:
|
||||
mtime = 0
|
||||
matched.append((mtime, str(p)))
|
||||
if len(matched) > _CODENAV_MAX_HITS * 5:
|
||||
break
|
||||
except (OSError, ValueError) as _e:
|
||||
return None, f"glob: {_e}"
|
||||
matched.sort(key=lambda t: t[0], reverse=True) # newest first
|
||||
return [pth for _, pth in matched[:_CODENAV_MAX_HITS]], None
|
||||
|
||||
paths, err = await asyncio.to_thread(_glob)
|
||||
if err:
|
||||
return {"error": err, "exit_code": 1}
|
||||
if not paths:
|
||||
return {"output": f"No files matching {pattern!r} under {root}", "exit_code": 0}
|
||||
out = "\n".join(paths)
|
||||
if len(paths) >= _CODENAV_MAX_HITS:
|
||||
out += f"\n... [capped at {_CODENAV_MAX_HITS} files]"
|
||||
return {"output": _truncate(out), "exit_code": 0}
|
||||
|
||||
if tool == "ls":
|
||||
raw_path = ""
|
||||
_s = (content or "").strip()
|
||||
if _s.startswith("{"):
|
||||
try:
|
||||
raw_path = str(_json.loads(_s).get("path", "")).strip()
|
||||
except _json.JSONDecodeError:
|
||||
raw_path = ""
|
||||
else:
|
||||
raw_path = _s.split("\n", 1)[0].strip()
|
||||
try:
|
||||
root = _resolve_search_root(raw_path)
|
||||
except ValueError as e:
|
||||
return {"error": f"ls: {e}", "exit_code": 1}
|
||||
|
||||
def _ls():
|
||||
if not os.path.isdir(root):
|
||||
return None, f"ls: {root}: not a directory"
|
||||
rows = []
|
||||
try:
|
||||
with os.scandir(root) as it:
|
||||
for entry in it:
|
||||
if entry.name.startswith("."):
|
||||
continue
|
||||
try:
|
||||
is_dir = entry.is_dir(follow_symlinks=False)
|
||||
size = entry.stat(follow_symlinks=False).st_size if not is_dir else 0
|
||||
except OSError:
|
||||
continue
|
||||
rows.append((is_dir, entry.name, size))
|
||||
except (PermissionError, OSError) as _e:
|
||||
return None, f"ls: {_e}"
|
||||
rows.sort(key=lambda r: (not r[0], r[1].lower())) # dirs first, then name
|
||||
lines = [f"{root}:"]
|
||||
for is_dir, name, size in rows[:_CODENAV_MAX_HITS]:
|
||||
lines.append(f" {name}/" if is_dir else f" {name} ({size} B)")
|
||||
if len(rows) > _CODENAV_MAX_HITS:
|
||||
lines.append(f" ... [{len(rows) - _CODENAV_MAX_HITS} more]")
|
||||
if not rows:
|
||||
lines.append(" (empty)")
|
||||
return "\n".join(lines), None
|
||||
|
||||
out, err = await asyncio.to_thread(_ls)
|
||||
if err:
|
||||
return {"error": err, "exit_code": 1}
|
||||
return {"output": _truncate(out), "exit_code": 0}
|
||||
|
||||
if tool == "web_search":
|
||||
from src.search import comprehensive_web_search
|
||||
raw = content.strip()
|
||||
@@ -909,6 +1156,12 @@ async def execute_tool_block(
|
||||
first_line = content.split(chr(10))[0][:80]
|
||||
desc = f"{tool}: {first_line}"
|
||||
result = await _call_mcp_tool(tool, content, progress_cb=progress_cb)
|
||||
elif tool in ("grep", "glob", "ls"):
|
||||
# Code-navigation tools — no MCP server; run the direct implementation.
|
||||
first_line = content.split(chr(10))[0][:80]
|
||||
desc = f"{tool}: {first_line}"
|
||||
result = await _direct_fallback(tool, content, progress_cb=progress_cb) \
|
||||
or {"error": f"{tool}: execution failed", "exit_code": 1}
|
||||
elif tool == "create_document":
|
||||
title = content.split("\n")[0].strip()[:60]
|
||||
desc = f"create_document: {title}"
|
||||
|
||||
@@ -23,6 +23,7 @@ logger = logging.getLogger(__name__)
|
||||
# These are the most commonly needed and should never be missing.
|
||||
ALWAYS_AVAILABLE = frozenset({
|
||||
"bash", "python", "web_search", "web_fetch", "read_file",
|
||||
"grep", "glob", "ls", # code-navigation tools (admin-gated by tool_security)
|
||||
"api_call", # For configured integrations (Miniflux, Gitea, Linkding, etc.)
|
||||
# The two genuinely AMBIENT cookbook tools — "what's running" and
|
||||
# "kill it" can be asked any time without prior cookbook context,
|
||||
@@ -63,7 +64,10 @@ BUILTIN_TOOL_DESCRIPTIONS: Dict[str, str] = {
|
||||
"python": "Execute Python code for computation, data processing, math, scripting, parsing, API calls. Not for writing code for the user.",
|
||||
"web_search": "Quick single web lookup for a fact, current event, or doc mid-task. NOT for 'research X' / 'do research on X' requests — those are deep-research jobs (use trigger_research). web_search = one query; trigger_research = a full researched report in the sidebar.",
|
||||
"web_fetch": "Fetch and read the text content of a specific URL/website the user names (e.g. 'check example.com', 'open this link'). Use when you have a concrete URL; for open-ended lookups use web_search instead.",
|
||||
"read_file": "Read a file from disk and return its contents. View source code, config files, logs.",
|
||||
"read_file": "Read a file from disk and return its contents. View source code, config files, logs. Supports an optional line range (offset/limit) for large files.",
|
||||
"grep": "Search file CONTENTS for a regex across a directory tree (ripgrep-backed, honours .gitignore). Returns file:line:match. Use to find where code/symbols/strings live — prefer over bash grep.",
|
||||
"glob": "Find FILES by glob pattern (e.g. '**/*.py'), newest first. Use to locate files by name/extension — prefer over bash find/ls.",
|
||||
"ls": "List a directory's entries (folders then files with sizes). Use to see what's in a folder — prefer over bash ls.",
|
||||
"write_file": "Write/create or fully rewrite a file ON DISK (source code, configs, project files). Use for new files or full rewrites — NOT create_document (editor panel) and NOT a bash heredoc.",
|
||||
"edit_file": "Edit an existing file ON DISK by exact string replacement (fix a bug, change a function). Shows a diff. The tool for changing files on disk — NOT edit_document (editor panel) and NOT bash sed/heredoc.",
|
||||
"create_document": "Create a new document in the editor panel. For code, articles, text content longer than 15 lines, unless an already-open document/email draft is the obvious target. If an email compose draft is open, edit that draft instead of creating another document.",
|
||||
|
||||
@@ -82,16 +82,65 @@ FUNCTION_TOOL_SCHEMAS = [
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "read_file",
|
||||
"description": "Read a file from disk",
|
||||
"description": "Read a file from disk. Optionally read a line range with offset/limit for large files.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"path": {"type": "string", "description": "File path to read"}
|
||||
"path": {"type": "string", "description": "File path to read"},
|
||||
"offset": {"type": "integer", "description": "1-based line to start reading from (optional)"},
|
||||
"limit": {"type": "integer", "description": "Max number of lines to read from offset (optional)"}
|
||||
},
|
||||
"required": ["path"]
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "grep",
|
||||
"description": "Search file contents for a regular expression across a directory tree (uses ripgrep when available, respecting .gitignore). Returns file:line:match. PREFER this over `bash grep/rg` for code search — confined to the allowed roots, structured output.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"pattern": {"type": "string", "description": "Regular expression to search for"},
|
||||
"path": {"type": "string", "description": "Directory or file to search (optional; defaults to the project root)"},
|
||||
"glob": {"type": "string", "description": "Only search files matching this glob, e.g. '*.py' (optional)"},
|
||||
"ignore_case": {"type": "boolean", "description": "Case-insensitive match (optional)"},
|
||||
"max_results": {"type": "integer", "description": "Max matches to return (optional)"}
|
||||
},
|
||||
"required": ["pattern"]
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "glob",
|
||||
"description": "Find files by glob pattern (recursive), newest first. e.g. '**/*.py'. PREFER this over `bash find/ls` for locating files — confined to the allowed roots.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"pattern": {"type": "string", "description": "Glob pattern, e.g. '**/*.ts' or 'src/**/test_*.py'"},
|
||||
"path": {"type": "string", "description": "Base directory (optional; defaults to the project root)"}
|
||||
},
|
||||
"required": ["pattern"]
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "ls",
|
||||
"description": "List the entries of a directory (folders first, then files with sizes). PREFER this over `bash ls` — confined to the allowed roots.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"path": {"type": "string", "description": "Directory to list (optional; defaults to the project root)"}
|
||||
},
|
||||
"required": []
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
@@ -1128,7 +1177,13 @@ def function_call_to_tool_block(name: str, arguments: str) -> Optional[ToolBlock
|
||||
else:
|
||||
content = args.get("query", "")
|
||||
elif tool_type == "read_file":
|
||||
content = args.get("path", "")
|
||||
# Plain path (back-compat) unless a line range is requested → JSON.
|
||||
if args.get("offset") or args.get("limit"):
|
||||
content = json.dumps(args)
|
||||
else:
|
||||
content = args.get("path", "")
|
||||
elif tool_type in ("grep", "glob", "ls"):
|
||||
content = json.dumps(args) if args else "{}"
|
||||
elif tool_type == "write_file":
|
||||
content = args.get("path", "") + "\n" + args.get("content", "")
|
||||
elif tool_type == "edit_file":
|
||||
|
||||
@@ -17,6 +17,9 @@ NON_ADMIN_BLOCKED_TOOLS = {
|
||||
"read_file",
|
||||
"write_file",
|
||||
"edit_file",
|
||||
"grep",
|
||||
"glob",
|
||||
"ls",
|
||||
"search_chats",
|
||||
"manage_memory",
|
||||
"manage_skills",
|
||||
|
||||
140
tests/test_code_nav_tools.py
Normal file
140
tests/test_code_nav_tools.py
Normal file
@@ -0,0 +1,140 @@
|
||||
"""Tests for the code-navigation tools (grep, glob, ls) + read_file line range."""
|
||||
import os
|
||||
import shutil
|
||||
import asyncio
|
||||
import tempfile
|
||||
import pytest
|
||||
|
||||
os.environ.setdefault("DATABASE_URL", "sqlite:////tmp/test_code_nav.db")
|
||||
|
||||
from src.tool_execution import _direct_fallback
|
||||
|
||||
|
||||
def _run(tool, content):
|
||||
return asyncio.run(_direct_fallback(tool, content))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def repo():
|
||||
# Built under /tmp, which is on the default tool-path allowlist.
|
||||
root = tempfile.mkdtemp(dir="/tmp", prefix="codenav_")
|
||||
try:
|
||||
with open(os.path.join(root, "a.py"), "w") as f:
|
||||
f.write("import os\n# needle here\nprint('x')\n")
|
||||
os.mkdir(os.path.join(root, "sub"))
|
||||
with open(os.path.join(root, "sub", "b.txt"), "w") as f:
|
||||
f.write("nothing\nNEEDLE upper\n")
|
||||
os.mkdir(os.path.join(root, "node_modules"))
|
||||
with open(os.path.join(root, "node_modules", "dep.py"), "w") as f:
|
||||
f.write("needle in dep\n")
|
||||
g = os.path.join(root, ".git")
|
||||
os.mkdir(g)
|
||||
with open(os.path.join(g, "config"), "w") as f:
|
||||
f.write("needle in git\n")
|
||||
yield root
|
||||
finally:
|
||||
shutil.rmtree(root, ignore_errors=True)
|
||||
|
||||
|
||||
# ── grep ──────────────────────────────────────────────────────────────────
|
||||
|
||||
def test_grep_finds_match(repo):
|
||||
r = _run("grep", f'{{"pattern": "needle", "path": "{repo}"}}')
|
||||
assert r["exit_code"] == 0
|
||||
assert "a.py:2:" in r["output"]
|
||||
|
||||
|
||||
def test_grep_skips_junk_dirs(repo):
|
||||
r = _run("grep", f'{{"pattern": "needle", "path": "{repo}"}}')
|
||||
assert "node_modules" not in r["output"]
|
||||
assert ".git/config" not in r["output"]
|
||||
|
||||
|
||||
def test_grep_ignore_case(repo):
|
||||
r = _run("grep", f'{{"pattern": "needle", "ignore_case": true, "path": "{repo}"}}')
|
||||
assert "b.txt:2:" in r["output"]
|
||||
|
||||
|
||||
def test_grep_glob_filter(repo):
|
||||
r = _run("grep", f'{{"pattern": "needle", "ignore_case": true, "glob": "*.py", "path": "{repo}"}}')
|
||||
assert "a.py" in r["output"]
|
||||
assert "b.txt" not in r["output"]
|
||||
|
||||
|
||||
def test_grep_no_match(repo):
|
||||
r = _run("grep", f'{{"pattern": "zzzznotfound", "path": "{repo}"}}')
|
||||
assert r["exit_code"] == 0
|
||||
assert "No matches" in r["output"]
|
||||
|
||||
|
||||
def test_grep_requires_pattern(repo):
|
||||
r = _run("grep", "{}")
|
||||
assert r["exit_code"] == 1
|
||||
assert "pattern is required" in r["error"]
|
||||
|
||||
|
||||
def test_grep_path_outside_roots_rejected(repo):
|
||||
r = _run("grep", '{"pattern": "x", "path": "/etc"}')
|
||||
assert r["exit_code"] == 1
|
||||
assert "outside the allowed roots" in r["error"]
|
||||
|
||||
|
||||
def test_grep_python_fallback_when_no_rg(repo, monkeypatch):
|
||||
monkeypatch.setattr(shutil, "which", lambda name: None)
|
||||
r = _run("grep", f'{{"pattern": "needle", "path": "{repo}"}}')
|
||||
assert r["exit_code"] == 0
|
||||
assert "a.py:2:" in r["output"]
|
||||
assert "node_modules" not in r["output"]
|
||||
assert ".git/config" not in r["output"]
|
||||
|
||||
|
||||
# ── glob ──────────────────────────────────────────────────────────────────
|
||||
|
||||
def test_glob_py(repo):
|
||||
r = _run("glob", f'{{"pattern": "*.py", "path": "{repo}"}}')
|
||||
assert r["exit_code"] == 0
|
||||
assert "a.py" in r["output"]
|
||||
|
||||
|
||||
def test_glob_recursive_skips_junk(repo):
|
||||
r = _run("glob", f'{{"pattern": "**/*.py", "path": "{repo}"}}')
|
||||
assert "a.py" in r["output"]
|
||||
assert "node_modules" not in r["output"]
|
||||
|
||||
|
||||
def test_glob_requires_pattern(repo):
|
||||
r = _run("glob", "{}")
|
||||
assert r["exit_code"] == 1
|
||||
|
||||
|
||||
# ── ls ────────────────────────────────────────────────────────────────────
|
||||
|
||||
def test_ls_lists_entries(repo):
|
||||
r = _run("ls", f'{{"path": "{repo}"}}')
|
||||
assert r["exit_code"] == 0
|
||||
assert "a.py" in r["output"]
|
||||
assert "sub/" in r["output"]
|
||||
assert ".git" not in r["output"] # hidden skipped
|
||||
|
||||
|
||||
def test_ls_path_outside_rejected(repo):
|
||||
r = _run("ls", '{"path": "/etc"}')
|
||||
assert r["exit_code"] == 1
|
||||
assert "outside the allowed roots" in r["error"]
|
||||
|
||||
|
||||
# ── read_file line range ───────────────────────────────────────────────────
|
||||
|
||||
def test_read_file_offset_limit(repo):
|
||||
p = os.path.join(repo, "lines.txt")
|
||||
with open(p, "w") as f:
|
||||
f.write("\n".join(f"line{i}" for i in range(1, 11)) + "\n")
|
||||
r = _run("read_file", f'{{"path": "{p}", "offset": 3, "limit": 2}}')
|
||||
assert r["exit_code"] == 0
|
||||
assert r["output"] == "line3\nline4\n"
|
||||
|
||||
|
||||
def test_read_file_plain_path_backcompat(repo):
|
||||
r = _run("read_file", os.path.join(repo, "a.py"))
|
||||
assert r["exit_code"] == 0
|
||||
assert "needle" in r["output"]
|
||||
Reference in New Issue
Block a user