diff --git a/src/agent_tools.py b/src/agent_tools.py index f93df01..578b943 100644 --- a/src/agent_tools.py +++ b/src/agent_tools.py @@ -27,6 +27,7 @@ MAX_READ_CHARS = 20_000 # Tool types that trigger execution TOOL_TAGS = {"bash", "python", "web_search", "web_fetch", "read_file", "write_file", "edit_file", + "grep", "glob", "ls", "create_document", "update_document", "edit_document", "search_chats", "chat_with_model", "create_session", "list_sessions", diff --git a/src/tool_execution.py b/src/tool_execution.py index 626bf5d..895340f 100644 --- a/src/tool_execution.py +++ b/src/tool_execution.py @@ -288,6 +288,34 @@ def get_mcp_manager(): return agent_tools.get_mcp_manager() +# Directories ignored by the code-nav tools' Python fallbacks so results aren't +# polluted by VCS internals / dependency trees / build caches. ripgrep already +# honours .gitignore; this is the parity floor for the no-rg path (and the +# explicit excludes passed to rg so it skips them even without a .gitignore). +_CODENAV_SKIP_DIRS = frozenset({ + ".git", ".hg", ".svn", "node_modules", "venv", ".venv", "__pycache__", + ".mypy_cache", ".pytest_cache", ".ruff_cache", "dist", "build", + ".next", ".cache", "site-packages", ".idea", ".tox", +}) +# Per-tool result caps (keep tool output cheap + model-friendly). +_CODENAV_MAX_HITS = 200 +_CODENAV_MAX_LINE = 400 + + +def _resolve_search_root(raw_path: str) -> str: + """Resolve + confine a code-nav path (grep/glob/ls). + + Empty path → the agent's primary root (first allowlisted root, i.e. the + project data dir). A supplied path is confined by the same allowlist + + sensitive-file policy as read_file (_resolve_tool_path). + """ + raw = (raw_path or "").strip() + if not raw: + roots = _tool_path_roots() + return roots[0] if roots else os.path.realpath(".") + return _resolve_tool_path(raw) + + def _truncate(text: str, limit: int = MAX_OUTPUT_CHARS) -> str: if len(text) > limit: return text[:limit] + f"\n... (truncated, {len(text)} chars total)" @@ -614,14 +642,42 @@ async def _direct_fallback( return {"output": output or "(no output)", "exit_code": rc or 0} if tool == "read_file": - raw_path = content.split("\n", 1)[0].strip() + # Args: plain path on line 1 (back-compat) OR JSON + # {path, offset?, limit?} where offset/limit are a 1-based line range. + raw_path, offset, limit = content.split("\n", 1)[0].strip(), 0, 0 + _stripped = content.strip() + if _stripped.startswith("{"): + try: + _a = _json.loads(_stripped) + raw_path = str(_a.get("path", "")).strip() + offset = int(_a.get("offset") or 0) + limit = int(_a.get("limit") or 0) + except (_json.JSONDecodeError, TypeError, ValueError): + pass try: path = _resolve_tool_path(raw_path) except ValueError as e: return {"error": f"read_file: {e}", "exit_code": 1} try: - # Run blocking read in a thread to keep the loop responsive + # Run blocking read in a thread to keep the loop responsive. def _read(): + if offset > 0 or limit > 0: + # Line-range read: slice [offset, offset+limit). + start = max(offset, 1) + out, n, budget = [], 0, MAX_READ_CHARS + with open(path, "r", encoding="utf-8", errors="replace") as f: + for i, line in enumerate(f, 1): + if i < start: + continue + if limit > 0 and n >= limit: + break + out.append(line) + n += 1 + budget -= len(line) + if budget <= 0: + out.append(f"\n... [truncated at {MAX_READ_CHARS} chars]") + break + return "".join(out) with open(path, "r", encoding="utf-8", errors="replace") as f: return f.read(MAX_READ_CHARS + 1) data = await asyncio.to_thread(_read) @@ -629,10 +685,11 @@ async def _direct_fallback( return {"error": f"read_file: {path}: not found", "exit_code": 1} except PermissionError: return {"error": f"read_file: {path}: permission denied", "exit_code": 1} + except IsADirectoryError: + return {"error": f"read_file: {path}: is a directory (use ls)", "exit_code": 1} except OSError as e: return {"error": f"read_file: {path}: {e}", "exit_code": 1} - truncated = len(data) > MAX_READ_CHARS - if truncated: + if not (offset > 0 or limit > 0) and len(data) > MAX_READ_CHARS: data = data[:MAX_READ_CHARS] + f"\n... [truncated at {MAX_READ_CHARS} chars]" return {"output": data, "exit_code": 0} @@ -671,6 +728,196 @@ async def _direct_fallback( result["diff"] = diff return result + if tool == "grep": + # Args (JSON): {pattern, path?, glob?, ignore_case?, max_results?}. + # Bare string → treated as the pattern. + args: Dict[str, Any] = {} + _s = (content or "").strip() + if _s.startswith("{"): + try: + args = _json.loads(_s) + except _json.JSONDecodeError: + args = {} + else: + args = {"pattern": _s} + pattern = str(args.get("pattern", "")).strip() + if not pattern: + return {"error": "grep: pattern is required", "exit_code": 1} + ignore_case = bool(args.get("ignore_case")) + glob_pat = str(args.get("glob", "") or "").strip() + try: + max_hits = int(args.get("max_results") or _CODENAV_MAX_HITS) + except (TypeError, ValueError): + max_hits = _CODENAV_MAX_HITS + max_hits = max(1, min(max_hits, _CODENAV_MAX_HITS)) + try: + root = _resolve_search_root(str(args.get("path", ""))) + except ValueError as e: + return {"error": f"grep: {e}", "exit_code": 1} + + def _grep(): + import re as _re + import shutil + rg = shutil.which("rg") + if rg: + cmd = [rg, "--line-number", "--no-heading", "--color=never", + "--max-count", str(max_hits)] + if ignore_case: + cmd.append("--ignore-case") + if glob_pat: + cmd += ["--glob", glob_pat] + # Exclude junk dirs even when the tree has no .gitignore, so + # results match the Python fallback's skip set. + for _d in _CODENAV_SKIP_DIRS: + cmd += ["--glob", f"!**/{_d}/**"] + cmd += ["--regexp", pattern, root] + try: + import subprocess + p = subprocess.run(cmd, capture_output=True, text=True, timeout=20) + lines = [ln for ln in (p.stdout or "").splitlines() if ln][:max_hits] + return lines, None + except subprocess.TimeoutExpired: + return None, "grep: timed out" + except Exception as _e: + return None, f"grep: {_e}" + # Python fallback (no ripgrep): walk + regex. + try: + rx = _re.compile(pattern, _re.IGNORECASE if ignore_case else 0) + except _re.error as _e: + return None, f"grep: bad pattern: {_e}" + import fnmatch + hits = [] + if os.path.isfile(root): + file_iter = [root] + else: + file_iter = [] + for dp, dns, fns in os.walk(root): + dns[:] = [d for d in dns if d not in _CODENAV_SKIP_DIRS] + for fn in fns: + if glob_pat and not fnmatch.fnmatch(fn, glob_pat): + continue + file_iter.append(os.path.join(dp, fn)) + for fp in file_iter: + if len(hits) >= max_hits: + break + try: + with open(fp, "r", encoding="utf-8", errors="strict") as f: + for i, line in enumerate(f, 1): + if rx.search(line): + hits.append(f"{fp}:{i}:{line.rstrip()[:_CODENAV_MAX_LINE]}") + if len(hits) >= max_hits: + break + except (UnicodeDecodeError, OSError): + continue # skip binary / unreadable + return hits, None + + lines, err = await asyncio.to_thread(_grep) + if err: + return {"error": err, "exit_code": 1} + if not lines: + return {"output": f"No matches for {pattern!r} under {root}", "exit_code": 0} + out = "\n".join(ln[:_CODENAV_MAX_LINE] for ln in lines) + if len(lines) >= max_hits: + out += f"\n... [capped at {max_hits} matches]" + return {"output": _truncate(out), "exit_code": 0} + + if tool == "glob": + args = {} + _s = (content or "").strip() + if _s.startswith("{"): + try: + args = _json.loads(_s) + except _json.JSONDecodeError: + args = {} + else: + args = {"pattern": _s} + pattern = str(args.get("pattern", "")).strip() + if not pattern: + return {"error": "glob: pattern is required", "exit_code": 1} + try: + root = _resolve_search_root(str(args.get("path", ""))) + except ValueError as e: + return {"error": f"glob: {e}", "exit_code": 1} + + def _glob(): + from pathlib import Path + base = Path(root) + if not base.is_dir(): + return None, f"glob: {root}: not a directory" + matched = [] + try: + for p in base.rglob(pattern): + if set(p.relative_to(base).parts) & _CODENAV_SKIP_DIRS: + continue + try: + mtime = p.stat().st_mtime + except OSError: + mtime = 0 + matched.append((mtime, str(p))) + if len(matched) > _CODENAV_MAX_HITS * 5: + break + except (OSError, ValueError) as _e: + return None, f"glob: {_e}" + matched.sort(key=lambda t: t[0], reverse=True) # newest first + return [pth for _, pth in matched[:_CODENAV_MAX_HITS]], None + + paths, err = await asyncio.to_thread(_glob) + if err: + return {"error": err, "exit_code": 1} + if not paths: + return {"output": f"No files matching {pattern!r} under {root}", "exit_code": 0} + out = "\n".join(paths) + if len(paths) >= _CODENAV_MAX_HITS: + out += f"\n... [capped at {_CODENAV_MAX_HITS} files]" + return {"output": _truncate(out), "exit_code": 0} + + if tool == "ls": + raw_path = "" + _s = (content or "").strip() + if _s.startswith("{"): + try: + raw_path = str(_json.loads(_s).get("path", "")).strip() + except _json.JSONDecodeError: + raw_path = "" + else: + raw_path = _s.split("\n", 1)[0].strip() + try: + root = _resolve_search_root(raw_path) + except ValueError as e: + return {"error": f"ls: {e}", "exit_code": 1} + + def _ls(): + if not os.path.isdir(root): + return None, f"ls: {root}: not a directory" + rows = [] + try: + with os.scandir(root) as it: + for entry in it: + if entry.name.startswith("."): + continue + try: + is_dir = entry.is_dir(follow_symlinks=False) + size = entry.stat(follow_symlinks=False).st_size if not is_dir else 0 + except OSError: + continue + rows.append((is_dir, entry.name, size)) + except (PermissionError, OSError) as _e: + return None, f"ls: {_e}" + rows.sort(key=lambda r: (not r[0], r[1].lower())) # dirs first, then name + lines = [f"{root}:"] + for is_dir, name, size in rows[:_CODENAV_MAX_HITS]: + lines.append(f" {name}/" if is_dir else f" {name} ({size} B)") + if len(rows) > _CODENAV_MAX_HITS: + lines.append(f" ... [{len(rows) - _CODENAV_MAX_HITS} more]") + if not rows: + lines.append(" (empty)") + return "\n".join(lines), None + + out, err = await asyncio.to_thread(_ls) + if err: + return {"error": err, "exit_code": 1} + return {"output": _truncate(out), "exit_code": 0} + if tool == "web_search": from src.search import comprehensive_web_search raw = content.strip() @@ -909,6 +1156,12 @@ async def execute_tool_block( first_line = content.split(chr(10))[0][:80] desc = f"{tool}: {first_line}" result = await _call_mcp_tool(tool, content, progress_cb=progress_cb) + elif tool in ("grep", "glob", "ls"): + # Code-navigation tools — no MCP server; run the direct implementation. + first_line = content.split(chr(10))[0][:80] + desc = f"{tool}: {first_line}" + result = await _direct_fallback(tool, content, progress_cb=progress_cb) \ + or {"error": f"{tool}: execution failed", "exit_code": 1} elif tool == "create_document": title = content.split("\n")[0].strip()[:60] desc = f"create_document: {title}" diff --git a/src/tool_index.py b/src/tool_index.py index 04435ad..3c277b9 100644 --- a/src/tool_index.py +++ b/src/tool_index.py @@ -23,6 +23,7 @@ logger = logging.getLogger(__name__) # These are the most commonly needed and should never be missing. ALWAYS_AVAILABLE = frozenset({ "bash", "python", "web_search", "web_fetch", "read_file", + "grep", "glob", "ls", # code-navigation tools (admin-gated by tool_security) "api_call", # For configured integrations (Miniflux, Gitea, Linkding, etc.) # The two genuinely AMBIENT cookbook tools — "what's running" and # "kill it" can be asked any time without prior cookbook context, @@ -63,7 +64,10 @@ BUILTIN_TOOL_DESCRIPTIONS: Dict[str, str] = { "python": "Execute Python code for computation, data processing, math, scripting, parsing, API calls. Not for writing code for the user.", "web_search": "Quick single web lookup for a fact, current event, or doc mid-task. NOT for 'research X' / 'do research on X' requests — those are deep-research jobs (use trigger_research). web_search = one query; trigger_research = a full researched report in the sidebar.", "web_fetch": "Fetch and read the text content of a specific URL/website the user names (e.g. 'check example.com', 'open this link'). Use when you have a concrete URL; for open-ended lookups use web_search instead.", - "read_file": "Read a file from disk and return its contents. View source code, config files, logs.", + "read_file": "Read a file from disk and return its contents. View source code, config files, logs. Supports an optional line range (offset/limit) for large files.", + "grep": "Search file CONTENTS for a regex across a directory tree (ripgrep-backed, honours .gitignore). Returns file:line:match. Use to find where code/symbols/strings live — prefer over bash grep.", + "glob": "Find FILES by glob pattern (e.g. '**/*.py'), newest first. Use to locate files by name/extension — prefer over bash find/ls.", + "ls": "List a directory's entries (folders then files with sizes). Use to see what's in a folder — prefer over bash ls.", "write_file": "Write/create or fully rewrite a file ON DISK (source code, configs, project files). Use for new files or full rewrites — NOT create_document (editor panel) and NOT a bash heredoc.", "edit_file": "Edit an existing file ON DISK by exact string replacement (fix a bug, change a function). Shows a diff. The tool for changing files on disk — NOT edit_document (editor panel) and NOT bash sed/heredoc.", "create_document": "Create a new document in the editor panel. For code, articles, text content longer than 15 lines, unless an already-open document/email draft is the obvious target. If an email compose draft is open, edit that draft instead of creating another document.", diff --git a/src/tool_schemas.py b/src/tool_schemas.py index 05134ae..d315111 100644 --- a/src/tool_schemas.py +++ b/src/tool_schemas.py @@ -82,16 +82,65 @@ FUNCTION_TOOL_SCHEMAS = [ "type": "function", "function": { "name": "read_file", - "description": "Read a file from disk", + "description": "Read a file from disk. Optionally read a line range with offset/limit for large files.", "parameters": { "type": "object", "properties": { - "path": {"type": "string", "description": "File path to read"} + "path": {"type": "string", "description": "File path to read"}, + "offset": {"type": "integer", "description": "1-based line to start reading from (optional)"}, + "limit": {"type": "integer", "description": "Max number of lines to read from offset (optional)"} }, "required": ["path"] } } }, + { + "type": "function", + "function": { + "name": "grep", + "description": "Search file contents for a regular expression across a directory tree (uses ripgrep when available, respecting .gitignore). Returns file:line:match. PREFER this over `bash grep/rg` for code search — confined to the allowed roots, structured output.", + "parameters": { + "type": "object", + "properties": { + "pattern": {"type": "string", "description": "Regular expression to search for"}, + "path": {"type": "string", "description": "Directory or file to search (optional; defaults to the project root)"}, + "glob": {"type": "string", "description": "Only search files matching this glob, e.g. '*.py' (optional)"}, + "ignore_case": {"type": "boolean", "description": "Case-insensitive match (optional)"}, + "max_results": {"type": "integer", "description": "Max matches to return (optional)"} + }, + "required": ["pattern"] + } + } + }, + { + "type": "function", + "function": { + "name": "glob", + "description": "Find files by glob pattern (recursive), newest first. e.g. '**/*.py'. PREFER this over `bash find/ls` for locating files — confined to the allowed roots.", + "parameters": { + "type": "object", + "properties": { + "pattern": {"type": "string", "description": "Glob pattern, e.g. '**/*.ts' or 'src/**/test_*.py'"}, + "path": {"type": "string", "description": "Base directory (optional; defaults to the project root)"} + }, + "required": ["pattern"] + } + } + }, + { + "type": "function", + "function": { + "name": "ls", + "description": "List the entries of a directory (folders first, then files with sizes). PREFER this over `bash ls` — confined to the allowed roots.", + "parameters": { + "type": "object", + "properties": { + "path": {"type": "string", "description": "Directory to list (optional; defaults to the project root)"} + }, + "required": [] + } + } + }, { "type": "function", "function": { @@ -1128,7 +1177,13 @@ def function_call_to_tool_block(name: str, arguments: str) -> Optional[ToolBlock else: content = args.get("query", "") elif tool_type == "read_file": - content = args.get("path", "") + # Plain path (back-compat) unless a line range is requested → JSON. + if args.get("offset") or args.get("limit"): + content = json.dumps(args) + else: + content = args.get("path", "") + elif tool_type in ("grep", "glob", "ls"): + content = json.dumps(args) if args else "{}" elif tool_type == "write_file": content = args.get("path", "") + "\n" + args.get("content", "") elif tool_type == "edit_file": diff --git a/src/tool_security.py b/src/tool_security.py index dd2ce83..8ffa50f 100644 --- a/src/tool_security.py +++ b/src/tool_security.py @@ -17,6 +17,9 @@ NON_ADMIN_BLOCKED_TOOLS = { "read_file", "write_file", "edit_file", + "grep", + "glob", + "ls", "search_chats", "manage_memory", "manage_skills", diff --git a/tests/test_code_nav_tools.py b/tests/test_code_nav_tools.py new file mode 100644 index 0000000..40e9b2b --- /dev/null +++ b/tests/test_code_nav_tools.py @@ -0,0 +1,140 @@ +"""Tests for the code-navigation tools (grep, glob, ls) + read_file line range.""" +import os +import shutil +import asyncio +import tempfile +import pytest + +os.environ.setdefault("DATABASE_URL", "sqlite:////tmp/test_code_nav.db") + +from src.tool_execution import _direct_fallback + + +def _run(tool, content): + return asyncio.run(_direct_fallback(tool, content)) + + +@pytest.fixture +def repo(): + # Built under /tmp, which is on the default tool-path allowlist. + root = tempfile.mkdtemp(dir="/tmp", prefix="codenav_") + try: + with open(os.path.join(root, "a.py"), "w") as f: + f.write("import os\n# needle here\nprint('x')\n") + os.mkdir(os.path.join(root, "sub")) + with open(os.path.join(root, "sub", "b.txt"), "w") as f: + f.write("nothing\nNEEDLE upper\n") + os.mkdir(os.path.join(root, "node_modules")) + with open(os.path.join(root, "node_modules", "dep.py"), "w") as f: + f.write("needle in dep\n") + g = os.path.join(root, ".git") + os.mkdir(g) + with open(os.path.join(g, "config"), "w") as f: + f.write("needle in git\n") + yield root + finally: + shutil.rmtree(root, ignore_errors=True) + + +# ── grep ────────────────────────────────────────────────────────────────── + +def test_grep_finds_match(repo): + r = _run("grep", f'{{"pattern": "needle", "path": "{repo}"}}') + assert r["exit_code"] == 0 + assert "a.py:2:" in r["output"] + + +def test_grep_skips_junk_dirs(repo): + r = _run("grep", f'{{"pattern": "needle", "path": "{repo}"}}') + assert "node_modules" not in r["output"] + assert ".git/config" not in r["output"] + + +def test_grep_ignore_case(repo): + r = _run("grep", f'{{"pattern": "needle", "ignore_case": true, "path": "{repo}"}}') + assert "b.txt:2:" in r["output"] + + +def test_grep_glob_filter(repo): + r = _run("grep", f'{{"pattern": "needle", "ignore_case": true, "glob": "*.py", "path": "{repo}"}}') + assert "a.py" in r["output"] + assert "b.txt" not in r["output"] + + +def test_grep_no_match(repo): + r = _run("grep", f'{{"pattern": "zzzznotfound", "path": "{repo}"}}') + assert r["exit_code"] == 0 + assert "No matches" in r["output"] + + +def test_grep_requires_pattern(repo): + r = _run("grep", "{}") + assert r["exit_code"] == 1 + assert "pattern is required" in r["error"] + + +def test_grep_path_outside_roots_rejected(repo): + r = _run("grep", '{"pattern": "x", "path": "/etc"}') + assert r["exit_code"] == 1 + assert "outside the allowed roots" in r["error"] + + +def test_grep_python_fallback_when_no_rg(repo, monkeypatch): + monkeypatch.setattr(shutil, "which", lambda name: None) + r = _run("grep", f'{{"pattern": "needle", "path": "{repo}"}}') + assert r["exit_code"] == 0 + assert "a.py:2:" in r["output"] + assert "node_modules" not in r["output"] + assert ".git/config" not in r["output"] + + +# ── glob ────────────────────────────────────────────────────────────────── + +def test_glob_py(repo): + r = _run("glob", f'{{"pattern": "*.py", "path": "{repo}"}}') + assert r["exit_code"] == 0 + assert "a.py" in r["output"] + + +def test_glob_recursive_skips_junk(repo): + r = _run("glob", f'{{"pattern": "**/*.py", "path": "{repo}"}}') + assert "a.py" in r["output"] + assert "node_modules" not in r["output"] + + +def test_glob_requires_pattern(repo): + r = _run("glob", "{}") + assert r["exit_code"] == 1 + + +# ── ls ──────────────────────────────────────────────────────────────────── + +def test_ls_lists_entries(repo): + r = _run("ls", f'{{"path": "{repo}"}}') + assert r["exit_code"] == 0 + assert "a.py" in r["output"] + assert "sub/" in r["output"] + assert ".git" not in r["output"] # hidden skipped + + +def test_ls_path_outside_rejected(repo): + r = _run("ls", '{"path": "/etc"}') + assert r["exit_code"] == 1 + assert "outside the allowed roots" in r["error"] + + +# ── read_file line range ─────────────────────────────────────────────────── + +def test_read_file_offset_limit(repo): + p = os.path.join(repo, "lines.txt") + with open(p, "w") as f: + f.write("\n".join(f"line{i}" for i in range(1, 11)) + "\n") + r = _run("read_file", f'{{"path": "{p}", "offset": 3, "limit": 2}}') + assert r["exit_code"] == 0 + assert r["output"] == "line3\nline4\n" + + +def test_read_file_plain_path_backcompat(repo): + r = _run("read_file", os.path.join(repo, "a.py")) + assert r["exit_code"] == 0 + assert "needle" in r["output"]