From 2be3779e6ed328cf4b71195039292e689930ee6b Mon Sep 17 00:00:00 2001 From: Kenny Van de Maele Date: Fri, 5 Jun 2026 00:06:37 +0200 Subject: [PATCH] feat: Add workspace: confine agent tools to a folder (#1103) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Add workspace: confine agent tools to a folder Pick a server folder as the agent's workspace so its file/shell tools work there and don't touch files outside it. File tools are hard-confined; bash/ python run with cwd set to the folder. Includes a slash command: `/workspace` (alias `/ws`) — show / `set ` / `clear` / `pick` (open the directory browser). - routes/workspace_routes.py: GET /api/workspace/browse (admin-only). - src/tool_execution.py: hard path confinement for read_file/write_file; bash/python cwd. Threaded route → stream_agent_loop → execute_tool_block. - src/agent_loop.py: workspace note prepended to the system prompt. - static/: overflow menu item, input-bar pill, directory-browser modal, and the /workspace slash command. - tests/test_workspace_confine.py. * Wire workspace confinement into tools that landed after this PR edit_file (#1239) and grep/glob/ls (#1670) merged after workspace-confine was written, so they bypassed the workspace boundary. Thread the workspace through: - edit_file: _do_edit_file resolves via _resolve_tool_path_in_workspace - grep/glob/ls: _resolve_search_root confines to the workspace (root + paths) - bash/python/bg cwd: workspace or _AGENT_WORKDIR (keep the #2586 data-dir default when no workspace is set) Tests cover edit_file + grep/ls confinement (inside ok, outside rejected). * Workspace picker: editable path bar + modal style cohesion + cross-platform hardening - Make the current-folder strip an editable address bar: type/paste a full path and press Enter to navigate (also reaches other Windows drives and hidden dirs the up-only browser cannot). - Reuse shared modal CSS: drop bespoke .workspace-modal-content/.workspace-btn* in favour of base .modal-content/.modal-body and the .confirm-btn button family; separators/hover use var(--border). Net -31 CSS lines. - Fix the path field overflowing the modal right edge (flex stretch + margin vs an overflow:auto scrollbar-feedback loop): full-bleed, no h-margin. - Cross-platform confinement: normcase the workspace commonpath check so containment holds on case-insensitive filesystems (Windows/macOS). - Make tests OS-portable: sibling temp dirs instead of /etc, python os.getcwd() instead of pwd. 5 pass. --- app.py | 3 + routes/chat_routes.py | 8 ++ routes/workspace_routes.py | 56 +++++++++++ src/agent_loop.py | 23 +++++ src/tool_execution.py | 88 ++++++++++++++---- static/app.js | 2 + static/index.html | 15 ++- static/js/chat.js | 4 + static/js/slashCommands.js | 38 ++++++++ static/js/storage.js | 3 +- static/js/workspace.js | 160 ++++++++++++++++++++++++++++++++ static/style.css | 43 +++++++++ tests/test_workspace_confine.py | 128 +++++++++++++++++++++++++ 13 files changed, 549 insertions(+), 22 deletions(-) create mode 100644 routes/workspace_routes.py create mode 100644 static/js/workspace.js create mode 100644 tests/test_workspace_confine.py diff --git a/app.py b/app.py index 4120be9..b34b818 100644 --- a/app.py +++ b/app.py @@ -525,6 +525,9 @@ upload_cleanup_task = None from routes.emoji_routes import setup_emoji_routes app.include_router(setup_emoji_routes()) +from routes.workspace_routes import setup_workspace_routes +app.include_router(setup_workspace_routes()) + # Sessions from routes.session_routes import setup_session_routes session_config = {"REQUEST_TIMEOUT": REQUEST_TIMEOUT, "OPENAI_API_KEY": OPENAI_API_KEY, "SESSIONS_FILE": SESSIONS_FILE} diff --git a/routes/chat_routes.py b/routes/chat_routes.py index 836e9da..a18a1a6 100644 --- a/routes/chat_routes.py +++ b/routes/chat_routes.py @@ -2,6 +2,7 @@ import asyncio import json +import os import time import logging from datetime import datetime @@ -394,6 +395,12 @@ def setup_chat_routes( compare_mode = str(form_data.get("compare_mode", "")).lower() == "true" incognito = str(form_data.get("incognito", "")).lower() == "true" chat_mode = str(form_data.get("mode", "")).lower() # 'chat' or 'agent' + # Workspace: confine the agent's file/shell tools to this folder. Validate + # it's a real directory; ignore (no confinement) otherwise. + workspace = (form_data.get("workspace") or "").strip() + if workspace: + _ws_real = os.path.realpath(os.path.expanduser(workspace)) + workspace = _ws_real if os.path.isdir(_ws_real) else "" # Did the USER explicitly pick agent mode? (vs. us auto-escalating # below). Skill extraction should only learn from real agent sessions, # not chats we quietly promoted for a notes/calendar intent. @@ -1007,6 +1014,7 @@ def setup_chat_routes( disabled_tools=disabled_tools if disabled_tools else None, owner=_user, fallbacks=_fallback_candidates, + workspace=workspace or None, ): if chunk.startswith("data: ") and not chunk.startswith("data: [DONE]"): try: diff --git a/routes/workspace_routes.py b/routes/workspace_routes.py new file mode 100644 index 0000000..f7b27fb --- /dev/null +++ b/routes/workspace_routes.py @@ -0,0 +1,56 @@ +"""Workspace API — browse server directories to pick a tool workspace folder.""" +import os +from fastapi import APIRouter, Request, HTTPException, Query + +from src.auth_helpers import get_current_user +from src.tool_security import owner_is_admin_or_single_user + + +def setup_workspace_routes(): + router = APIRouter(prefix="/api/workspace", tags=["workspace"]) + + @router.get("/browse") + def browse(request: Request, path: str = Query(default="")): + """List subdirectories of `path` (default: home) so the UI can navigate + the server filesystem and pick a workspace folder. Directories only. + + ADMIN-ONLY: this enumerates the server filesystem, so it is gated the + same way the file/shell tools are (read_file/write_file/bash are in + NON_ADMIN_BLOCKED_TOOLS). A non-admin who can't use those tools must not + be able to map the host's directory tree either. + """ + owner = get_current_user(request) + if not owner_is_admin_or_single_user(owner): + raise HTTPException(status_code=403, detail="Workspace browsing is admin-only") + + # Resolve symlinks so the reported path is canonical and the UI navigates + # real directories (defends against symlink games in displayed paths). + target = os.path.realpath(os.path.expanduser(path.strip() or "~")) + if not os.path.isdir(target): + target = os.path.realpath(os.path.expanduser("~")) + + dirs = [] + try: + with os.scandir(target) as it: + for entry in it: + try: + # Don't follow symlinks when classifying — a symlinked + # dir is skipped rather than letting the browser wander + # off via a link. Hidden entries are omitted. + if entry.is_dir(follow_symlinks=False) and not entry.name.startswith("."): + # Build the child path server-side with os.path.join + # so it's correct on Windows (backslashes) and Linux. + dirs.append({"name": entry.name, "path": os.path.join(target, entry.name)}) + except OSError: + continue + except (PermissionError, OSError): + dirs = [] + + parent = os.path.dirname(target) + return { + "path": target, + "parent": parent if parent and parent != target else None, + "dirs": sorted(dirs, key=lambda d: d["name"].lower()), + } + + return router diff --git a/src/agent_loop.py b/src/agent_loop.py index dcca097..eabc340 100644 --- a/src/agent_loop.py +++ b/src/agent_loop.py @@ -1387,6 +1387,7 @@ async def stream_agent_loop( owner: Optional[str] = None, relevant_tools: Optional[Set[str]] = None, fallbacks: Optional[List[tuple]] = None, + workspace: Optional[str] = None, _is_teacher_run: bool = False, ) -> AsyncGenerator[str, None]: """Streaming agent loop generator. @@ -1553,6 +1554,27 @@ async def stream_agent_loop( compact=_is_api_model, owner=owner, ) + if workspace: + # PREPEND (not append) so it dominates the large base prompt — appended + # at the end, small models ignored it and asked the user for code. The + # folder IS the project; the agent must explore it, not ask. + _ws_note = ( + f"## ACTIVE WORKSPACE — READ FIRST\n" + f"The user is working in this folder: {workspace}\n" + f"It IS the project. bash/python run with cwd set here and " + f"read_file/write_file are confined to it (paths outside are rejected).\n" + f"When the user says \"the code\" / \"this project\" / \"the workspace\" " + f"or asks to review/find/edit something WITHOUT a path, they mean THIS " + f"folder. Do NOT ask the user for code or a path, and do NOT read a file " + f"literally named \"workspace\". ALWAYS start by exploring it yourself: " + f"run `bash` → `git ls-files` (or `ls -R`) to see the files, then " + f"read_file the relevant ones by path RELATIVE to the workspace." + ) + if messages and messages[0].get("role") == "system": + messages[0]["content"] = _ws_note + "\n\n" + (messages[0].get("content") or "") + else: + messages.insert(0, {"role": "system", "content": _ws_note}) + logger.info("[workspace] active for this turn: %s", workspace) prep_timings["prompt_build"] = time.time() - _t2 _t3 = time.time() @@ -2117,6 +2139,7 @@ async def stream_agent_loop( disabled_tools=disabled_tools, owner=owner, progress_cb=_push_progress, + workspace=workspace, ) finally: # Sentinel so the drainer knows to stop. diff --git a/src/tool_execution.py b/src/tool_execution.py index 41b81c8..a667266 100644 --- a/src/tool_execution.py +++ b/src/tool_execution.py @@ -67,12 +67,13 @@ def _unified_diff(old: str, new: str, path: str) -> Optional[Dict[str, Any]]: } -async def _do_edit_file(content: str) -> Dict[str, Any]: +async def _do_edit_file(content: str, workspace: Optional[str] = None) -> Dict[str, Any]: """Exact string-replacement edit of an on-disk file. content is JSON: {"path", "old_string", "new_string", "replace_all"?}. Fails if old_string is missing or non-unique (unless replace_all) so the model can't silently edit the wrong place. Returns a unified diff for the UI. + Confined to the workspace when one is set (same policy as write_file). """ try: args = json.loads(content) if content.strip().startswith("{") else {} @@ -84,9 +85,11 @@ async def _do_edit_file(content: str) -> Dict[str, Any]: replace_all = bool(args.get("replace_all", False)) if not raw_path: return {"error": "edit_file: path required", "exit_code": 1} - # Confine to the same allowlist + sensitive-file policy as read/write_file. + # Confine to the workspace when set, else the same allowlist + sensitive-file + # policy as read/write_file. try: - path = _resolve_tool_path(raw_path) + path = (_resolve_tool_path_in_workspace(workspace, raw_path) + if workspace else _resolve_tool_path(raw_path)) except ValueError as e: return {"error": f"edit_file: {e}", "exit_code": 1} if old == "": @@ -268,6 +271,40 @@ def _resolve_tool_path(raw_path: str) -> str: f"path '{raw_path}' is outside the allowed roots" ) + +def _resolve_tool_path_in_workspace(workspace: str, raw_path: str) -> str: + """Confine a model-supplied path to the active workspace. + + Layered on top of upstream's path policy: the workspace is the allowed + root (relative paths resolve under it; paths that escape it are rejected), + and the sensitive-file deny list (.ssh, .gnupg, id_rsa, …) still applies + inside it. When no workspace is set, callers use _resolve_tool_path (the + default data/tmp allowlist) instead. + """ + if raw_path is None or not str(raw_path).strip(): + raise ValueError("path is required") + base = os.path.realpath(workspace) + expanded = os.path.expanduser(str(raw_path).strip()) + candidate = expanded if os.path.isabs(expanded) else os.path.join(base, expanded) + resolved = os.path.realpath(candidate) + if _is_sensitive_path(resolved): + raise ValueError( + f"path '{raw_path}' is inside a sensitive directory " + f"(e.g. .ssh, .gnupg) or matches a sensitive filename" + ) + if resolved != base: + # normcase so containment holds on case-insensitive filesystems + # (Windows, default macOS): it lowercases on Windows and is a no-op on + # POSIX. commonpath raises ValueError across Windows drives (C: vs D:) + # or mixed abs/rel — both mean "outside", so the except rejects them. + nbase = os.path.normcase(base) + try: + if os.path.commonpath([os.path.normcase(resolved), nbase]) != nbase: + raise ValueError + except ValueError: + raise ValueError(f"path '{raw_path}' is outside the workspace ({workspace})") + return resolved + # Bash + python tools used to share a single 60s timeout. That's # enough for one-shot commands but starves real workloads (pip # install, ffmpeg conversions, etc.) — and worse, the agent saw the @@ -310,14 +347,19 @@ _CODENAV_MAX_HITS = 200 _CODENAV_MAX_LINE = 400 -def _resolve_search_root(raw_path: str) -> str: +def _resolve_search_root(raw_path: str, workspace: Optional[str] = None) -> str: """Resolve + confine a code-nav path (grep/glob/ls). - Empty path → the agent's primary root (first allowlisted root, i.e. the - project data dir). A supplied path is confined by the same allowlist + - sensitive-file policy as read_file (_resolve_tool_path). + With a workspace set, the workspace folder is the root and supplied paths are + confined inside it (same policy as read_file). Without one, an empty path + defaults to the agent's primary root (project data dir) and a supplied path + is confined by the global allowlist + sensitive-file policy. """ raw = (raw_path or "").strip() + if workspace: + if not raw: + return os.path.realpath(workspace) + return _resolve_tool_path_in_workspace(workspace, raw) if not raw: roots = _tool_path_roots() return roots[0] if roots else os.path.realpath(".") @@ -534,11 +576,12 @@ async def _call_mcp_tool( tool: str, content: str, progress_cb: Optional[Callable[[Dict], Awaitable[None]]] = None, + workspace: Optional[str] = None, ) -> Dict: """Route a legacy tool call through the MCP manager, with direct fallbacks.""" mcp = get_mcp_manager() if not mcp: - return await _direct_fallback(tool, content, progress_cb=progress_cb) or {"error": f"MCP manager not available for tool '{tool}'", "exit_code": 1} + return await _direct_fallback(tool, content, progress_cb=progress_cb, workspace=workspace) or {"error": f"MCP manager not available for tool '{tool}'", "exit_code": 1} server_id, tool_name = _MCP_TOOL_MAP[tool] qualified = f"mcp__{server_id}__{tool_name}" @@ -547,7 +590,7 @@ async def _call_mcp_tool( # If MCP server not connected, try direct fallback if isinstance(result, dict) and result.get("exit_code") == 1 and "not connected" in result.get("error", ""): - fallback = await _direct_fallback(tool, content, progress_cb=progress_cb) + fallback = await _direct_fallback(tool, content, progress_cb=progress_cb, workspace=workspace) if fallback: return fallback @@ -574,6 +617,7 @@ async def _direct_fallback( tool: str, content: str, progress_cb: Optional[Callable[[Dict], Awaitable[None]]] = None, + workspace: Optional[str] = None, ) -> Optional[Dict]: """In-process execution path for the eight tools that used to live as stdio MCP servers under mcp_servers/. Those servers were deleted in @@ -609,7 +653,7 @@ async def _direct_fallback( stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=_subproc_env, - cwd=_AGENT_WORKDIR, + cwd=workspace or _AGENT_WORKDIR, ) stdout, stderr, rc, timed_out = await _run_subprocess_streaming( proc, @@ -636,7 +680,7 @@ async def _direct_fallback( stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=_subproc_env, - cwd=_AGENT_WORKDIR, + cwd=workspace or _AGENT_WORKDIR, ) stdout, stderr, rc, timed_out = await _run_subprocess_streaming( proc, @@ -666,7 +710,8 @@ async def _direct_fallback( except (_json.JSONDecodeError, TypeError, ValueError): pass try: - path = _resolve_tool_path(raw_path) + path = (_resolve_tool_path_in_workspace(workspace, raw_path) + if workspace else _resolve_tool_path(raw_path)) except ValueError as e: return {"error": f"read_file: {e}", "exit_code": 1} try: @@ -709,7 +754,8 @@ async def _direct_fallback( raw_path = lines[0].strip() body = lines[1] if len(lines) > 1 else "" try: - path = _resolve_tool_path(raw_path) + path = (_resolve_tool_path_in_workspace(workspace, raw_path) + if workspace else _resolve_tool_path(raw_path)) except ValueError as e: return {"error": f"write_file: {e}", "exit_code": 1} try: @@ -762,7 +808,7 @@ async def _direct_fallback( max_hits = _CODENAV_MAX_HITS max_hits = max(1, min(max_hits, _CODENAV_MAX_HITS)) try: - root = _resolve_search_root(str(args.get("path", ""))) + root = _resolve_search_root(str(args.get("path", "")), workspace) except ValueError as e: return {"error": f"grep: {e}", "exit_code": 1} @@ -846,7 +892,7 @@ async def _direct_fallback( if not pattern: return {"error": "glob: pattern is required", "exit_code": 1} try: - root = _resolve_search_root(str(args.get("path", ""))) + root = _resolve_search_root(str(args.get("path", "")), workspace) except ValueError as e: return {"error": f"glob: {e}", "exit_code": 1} @@ -893,7 +939,7 @@ async def _direct_fallback( else: raw_path = _s.split("\n", 1)[0].strip() try: - root = _resolve_search_root(raw_path) + root = _resolve_search_root(raw_path, workspace) except ValueError as e: return {"error": f"ls: {e}", "exit_code": 1} @@ -1057,6 +1103,7 @@ async def execute_tool_block( disabled_tools: Optional[set] = None, owner: Optional[str] = None, progress_cb: Optional[Callable[[Dict], Awaitable[None]]] = None, + workspace: Optional[str] = None, ) -> Tuple[str, Dict]: """Execute a single tool block. Returns (description, result_dict). @@ -1144,7 +1191,7 @@ async def execute_tool_block( _is_bg, _bg_cmd = _split_bg_marker(content) if _is_bg and _bg_cmd: from src import bg_jobs - rec = bg_jobs.launch(_bg_cmd, session_id=session_id) + rec = bg_jobs.launch(_bg_cmd, session_id=session_id, cwd=workspace or _AGENT_WORKDIR) short = _bg_cmd.strip().split(chr(10))[0][:80] desc = f"bash (background): {short}" result = { @@ -1166,12 +1213,13 @@ async def execute_tool_block( if tool in _MCP_TOOL_MAP: first_line = content.split(chr(10))[0][:80] desc = f"{tool}: {first_line}" - result = await _call_mcp_tool(tool, content, progress_cb=progress_cb) + result = await _call_mcp_tool(tool, content, progress_cb=progress_cb, workspace=workspace) elif tool in ("grep", "glob", "ls"): # Code-navigation tools — no MCP server; run the direct implementation. + # Confined to the workspace when one is set (same policy as read_file). first_line = content.split(chr(10))[0][:80] desc = f"{tool}: {first_line}" - result = await _direct_fallback(tool, content, progress_cb=progress_cb) \ + result = await _direct_fallback(tool, content, progress_cb=progress_cb, workspace=workspace) \ or {"error": f"{tool}: execution failed", "exit_code": 1} elif tool == "create_document": title = content.split("\n")[0].strip()[:60] @@ -1273,7 +1321,7 @@ async def execute_tool_block( desc = "edit_image" result = await do_edit_image(content, owner=owner) elif tool == "edit_file": - result = await _do_edit_file(content) + result = await _do_edit_file(content, workspace=workspace) desc = result.get("output") or result.get("error") or "edit_file" elif tool == "trigger_research": desc = "trigger_research" diff --git a/static/app.js b/static/app.js index 8593da3..08ab121 100644 --- a/static/app.js +++ b/static/app.js @@ -4,6 +4,7 @@ // ============================================ import Storage from './js/storage.js'; import uiModule from './js/ui.js'; +import workspaceModule from './js/workspace.js'; import fileHandlerModule from './js/fileHandler.js'; import modelsModule from './js/models.js'; import ragModule from './js/rag.js'; @@ -1687,6 +1688,7 @@ function initializeEventListeners() { } setupToggle('web-toggle-btn', 'web-toggle', 'web'); setupToggle('bash-toggle-btn', 'bash-toggle', 'bash'); + try { workspaceModule.initWorkspace(); } catch (_) {} // Document editor toggle (special: uses module panel, not a checkbox) const overflowDocBtn = el('overflow-doc-btn'); diff --git a/static/index.html b/static/index.html index 03edfa9..c5f3828 100644 --- a/static/index.html +++ b/static/index.html @@ -1031,6 +1031,13 @@ RAG + + + + + + + `; + document.body.appendChild(_modal); + _modal.querySelector('#workspace-close').addEventListener('click', closeWorkspaceBrowser); + _modal.querySelector('#workspace-cancel').addEventListener('click', closeWorkspaceBrowser); + // Editable path bar: Enter navigates to a typed/pasted folder. + _modal.querySelector('#workspace-cur-path').addEventListener('keydown', (e) => { + if (e.key === 'Enter') { + e.preventDefault(); + const v = e.target.value.trim(); + if (v) _navigate(v); + } + }); + _modal.querySelector('#workspace-use').addEventListener('click', () => { + setWorkspace(_curPath); + if (uiModule && uiModule.showToast) uiModule.showToast(`Workspace set: ${_basename(_curPath)}`); + closeWorkspaceBrowser(); + }); + const content = _modal.querySelector('.modal-content'); + const header = _modal.querySelector('.modal-header'); + if (content && header) makeWindowDraggable(_modal, { content, header }); + return _modal; +} + +export async function openWorkspaceBrowser() { + const modal = _getModal(); + modal.style.display = 'flex'; + try { + _render(await _load(getWorkspace() || '')); + } catch (e) { + if (uiModule && uiModule.showError) uiModule.showError('Could not browse folders'); + } +} + +export function closeWorkspaceBrowser() { + if (_modal) _modal.style.display = 'none'; +} + +export function initWorkspace() { + // Restore persisted workspace into the pill on load. + syncWorkspaceIndicator(getWorkspace()); + const overflow = document.getElementById('overflow-workspace-btn'); + if (overflow) overflow.addEventListener('click', openWorkspaceBrowser); + const pill = document.getElementById('workspace-indicator-btn'); + if (pill) pill.addEventListener('click', clearWorkspace); +} + +export default { initWorkspace, openWorkspaceBrowser, getWorkspace, setWorkspace, clearWorkspace, syncWorkspaceIndicator }; diff --git a/static/style.css b/static/style.css index 1710504..39f1e9e 100644 --- a/static/style.css +++ b/static/style.css @@ -35877,3 +35877,46 @@ body.theme-frosted .modal { line-height: 1.4; color: color-mix(in srgb, var(--fg) 45%, transparent); } +/* ── Workspace picker ───────────────────────────────────────────── */ +/* Layout (width/flex column/max-height) inherited from base .modal-content. */ +/* Editable path/address bar: reuses .styled-prompt-input for border/bg/radius/ + focus ring (set in the element's class list). Overrides only the deltas: + mono font, and full-bleed via flex stretch with no horizontal margin (the + modal-content's 10px padding is the gutter) instead of the base width:100%, + which overflowed against the overflow:auto scrollbar. */ +.workspace-cur { + align-self: stretch; + width: auto; + min-width: 0; + margin: 4px 0 8px; + font-family: var(--mono, monospace); + font-size: 12px; +} +/* flex/overflow inherited from base .modal-body; only the padding differs. */ +.workspace-body { padding: 6px 0; } +.workspace-row { + padding: 7px 18px; + cursor: pointer; + font-size: 13px; + display: flex; + align-items: center; + gap: 8px; +} +.workspace-row > span { + white-space: nowrap; + overflow: hidden; + text-overflow: ellipsis; +} +.workspace-row-icon { flex-shrink: 0; opacity: 0.75; } +.workspace-row:hover { + background: color-mix(in srgb, var(--border) 20%, transparent); +} +.workspace-up { opacity: 0.7; } +.workspace-empty { padding: 14px 18px; opacity: 0.5; font-size: 13px; } +.workspace-footer { + display: flex; + justify-content: flex-end; + gap: 8px; + padding: 10px 18px; + border-top: 1px solid var(--border); +} diff --git a/tests/test_workspace_confine.py b/tests/test_workspace_confine.py new file mode 100644 index 0000000..94ab327 --- /dev/null +++ b/tests/test_workspace_confine.py @@ -0,0 +1,128 @@ +"""Workspace confinement: file tools are hard-bounded to the workspace folder +(layered on upstream's sensitive-path policy); bash runs with cwd there.""" +import os +import tempfile + +import pytest + +from src.tool_execution import _resolve_tool_path_in_workspace, _direct_fallback + + +def test_workspace_resolver_confines(): + ws = tempfile.mkdtemp() + open(os.path.join(ws, "a.txt"), "w").write("x") + real = os.path.realpath(os.path.join(ws, "a.txt")) + # relative path resolves under the workspace + assert _resolve_tool_path_in_workspace(ws, "a.txt") == real + # absolute path inside the workspace is allowed + assert _resolve_tool_path_in_workspace(ws, os.path.join(ws, "a.txt")) == real + # absolute path outside is rejected (sibling temp dir, portable across OSes) + outside = tempfile.mkdtemp() + with pytest.raises(ValueError): + _resolve_tool_path_in_workspace(ws, os.path.join(outside, "x.txt")) + # parent-escape is rejected + with pytest.raises(ValueError): + _resolve_tool_path_in_workspace(ws, os.path.join("..", "..", "escape.txt")) + + +def test_workspace_resolver_blocks_sensitive(): + """Upstream's sensitive-file deny list still applies inside the workspace.""" + ws = tempfile.mkdtemp() + os.makedirs(os.path.join(ws, ".ssh"), exist_ok=True) + with pytest.raises(ValueError): + _resolve_tool_path_in_workspace(ws, ".ssh/authorized_keys") + + +@pytest.mark.asyncio +async def test_read_write_confined_in_workspace(): + ws = tempfile.mkdtemp() + # Write inside the workspace (relative path) succeeds. + res = await _direct_fallback("write_file", "note.txt\nhello", workspace=ws) + assert res["exit_code"] == 0 + assert os.path.isfile(os.path.join(ws, "note.txt")) + # Read it back. + res = await _direct_fallback("read_file", "note.txt", workspace=ws) + assert res["exit_code"] == 0 and res["output"] == "hello" + # Reading outside the workspace is rejected (sibling temp dir, portable). + outside = tempfile.mkdtemp() + outside_file = os.path.join(outside, "secret.txt") + open(outside_file, "w").write("nope") + res = await _direct_fallback("read_file", outside_file, workspace=ws) + assert res["exit_code"] == 1 and "outside the workspace" in res["error"] + # Writing outside is rejected (file must not be created). + escape = os.path.join(outside, "_ws_escape.txt") + res = await _direct_fallback("write_file", f"{escape}\nx", workspace=ws) + assert res["exit_code"] == 1 and "outside the workspace" in res["error"] + assert not os.path.exists(escape) + + +def test_browse_is_admin_gated(monkeypatch): + """The directory-browser endpoint must refuse non-admin callers.""" + from fastapi import HTTPException + import routes.workspace_routes as wr + + router = wr.setup_workspace_routes() + browse = next(r.endpoint for r in router.routes if r.path == "/api/workspace/browse") + + monkeypatch.setattr(wr, "get_current_user", lambda req: "bob") + monkeypatch.setattr(wr, "owner_is_admin_or_single_user", lambda owner: False) + with pytest.raises(HTTPException) as ei: + browse(request=object(), path="/") + assert ei.value.status_code == 403 + + # Admin / single-user is allowed. + monkeypatch.setattr(wr, "owner_is_admin_or_single_user", lambda owner: True) + out = browse(request=object(), path=os.path.expanduser("~")) + assert "dirs" in out and "path" in out + assert all("name" in d and "path" in d for d in out["dirs"]) + + +@pytest.mark.asyncio +async def test_subprocess_runs_with_workspace_cwd(): + """bash/python subprocesses run with cwd set to the workspace. Use the + python tool for an OS-agnostic cwd probe (Windows cmd has no `pwd`).""" + ws = tempfile.mkdtemp() + res = await _direct_fallback("python", "import os; print(os.getcwd())", workspace=ws) + assert res["exit_code"] == 0 + assert os.path.realpath(res["output"].strip()) == os.path.realpath(ws) + + +# --- Tools that landed after this PR, now wired into the workspace ----------- + +@pytest.mark.asyncio +async def test_edit_file_confined_in_workspace(): + import json + from src.tool_execution import _do_edit_file + ws = tempfile.mkdtemp() + open(os.path.join(ws, "f.txt"), "w").write("foo bar") + # Edit inside the workspace succeeds. + res = await _do_edit_file(json.dumps( + {"path": "f.txt", "old_string": "foo", "new_string": "baz"}), workspace=ws) + assert res["exit_code"] == 0 + assert open(os.path.join(ws, "f.txt")).read() == "baz bar" + # Editing outside the workspace is rejected (sibling temp dir, portable). + outside = tempfile.mkdtemp() + outside_file = os.path.join(outside, "f.txt") + open(outside_file, "w").write("a") + res = await _do_edit_file(json.dumps( + {"path": outside_file, "old_string": "a", "new_string": "b"}), workspace=ws) + assert res["exit_code"] == 1 and "outside the workspace" in res["error"] + + +@pytest.mark.asyncio +async def test_grep_and_ls_confined_in_workspace(): + import json + ws = tempfile.mkdtemp() + open(os.path.join(ws, "doc.txt"), "w").write("hello workspace\n") + # grep with no path searches the workspace root and finds the match. + res = await _direct_fallback("grep", json.dumps({"pattern": "hello"}), workspace=ws) + assert res["exit_code"] == 0 and "doc.txt" in res["output"] + # grep pointed outside the workspace is rejected (sibling temp dir, portable). + outside = tempfile.mkdtemp() + res = await _direct_fallback("grep", json.dumps({"pattern": "x", "path": outside}), workspace=ws) + assert res["exit_code"] == 1 and "outside the workspace" in res["error"] + # ls of the workspace lists its files; ls outside is rejected. + res = await _direct_fallback("ls", "", workspace=ws) + assert res["exit_code"] == 0 and "doc.txt" in res["output"] + res = await _direct_fallback("ls", outside, workspace=ws) + assert res["exit_code"] == 1 and "outside the workspace" in res["error"]