feat: Add workspace: confine agent tools to a folder (#1103)

* feat: Add workspace: confine agent tools to a folder

Pick a server folder as the agent's workspace so its file/shell tools work
there and don't touch files outside it. File tools are hard-confined; bash/
python run with cwd set to the folder.

Includes a slash command: `/workspace` (alias `/ws`) — show / `set <path>` /
`clear` / `pick` (open the directory browser).

- routes/workspace_routes.py: GET /api/workspace/browse (admin-only).
- src/tool_execution.py: hard path confinement for read_file/write_file;
  bash/python cwd. Threaded route → stream_agent_loop → execute_tool_block.
- src/agent_loop.py: workspace note prepended to the system prompt.
- static/: overflow menu item, input-bar pill, directory-browser modal, and
  the /workspace slash command.
- tests/test_workspace_confine.py.

* Wire workspace confinement into tools that landed after this PR

edit_file (#1239) and grep/glob/ls (#1670) merged after workspace-confine was
written, so they bypassed the workspace boundary. Thread the workspace through:
  - edit_file: _do_edit_file resolves via _resolve_tool_path_in_workspace
  - grep/glob/ls: _resolve_search_root confines to the workspace (root + paths)
  - bash/python/bg cwd: workspace or _AGENT_WORKDIR (keep the #2586 data-dir
    default when no workspace is set)
Tests cover edit_file + grep/ls confinement (inside ok, outside rejected).

* Workspace picker: editable path bar + modal style cohesion + cross-platform hardening

- Make the current-folder strip an editable address bar: type/paste a full
  path and press Enter to navigate (also reaches other Windows drives and
  hidden dirs the up-only browser cannot).
- Reuse shared modal CSS: drop bespoke .workspace-modal-content/.workspace-btn*
  in favour of base .modal-content/.modal-body and the .confirm-btn button
  family; separators/hover use var(--border). Net -31 CSS lines.
- Fix the path field overflowing the modal right edge (flex stretch + margin
  vs an overflow:auto scrollbar-feedback loop): full-bleed, no h-margin.
- Cross-platform confinement: normcase the workspace commonpath check so
  containment holds on case-insensitive filesystems (Windows/macOS).
- Make tests OS-portable: sibling temp dirs instead of /etc, python os.getcwd()
  instead of pwd. 5 pass.
This commit is contained in:
Kenny Van de Maele
2026-06-05 00:06:37 +02:00
committed by GitHub
parent 7b4365fe57
commit 2be3779e6e
13 changed files with 549 additions and 22 deletions

View File

@@ -67,12 +67,13 @@ def _unified_diff(old: str, new: str, path: str) -> Optional[Dict[str, Any]]:
}
async def _do_edit_file(content: str) -> Dict[str, Any]:
async def _do_edit_file(content: str, workspace: Optional[str] = None) -> Dict[str, Any]:
"""Exact string-replacement edit of an on-disk file.
content is JSON: {"path", "old_string", "new_string", "replace_all"?}.
Fails if old_string is missing or non-unique (unless replace_all) so the
model can't silently edit the wrong place. Returns a unified diff for the UI.
Confined to the workspace when one is set (same policy as write_file).
"""
try:
args = json.loads(content) if content.strip().startswith("{") else {}
@@ -84,9 +85,11 @@ async def _do_edit_file(content: str) -> Dict[str, Any]:
replace_all = bool(args.get("replace_all", False))
if not raw_path:
return {"error": "edit_file: path required", "exit_code": 1}
# Confine to the same allowlist + sensitive-file policy as read/write_file.
# Confine to the workspace when set, else the same allowlist + sensitive-file
# policy as read/write_file.
try:
path = _resolve_tool_path(raw_path)
path = (_resolve_tool_path_in_workspace(workspace, raw_path)
if workspace else _resolve_tool_path(raw_path))
except ValueError as e:
return {"error": f"edit_file: {e}", "exit_code": 1}
if old == "":
@@ -268,6 +271,40 @@ def _resolve_tool_path(raw_path: str) -> str:
f"path '{raw_path}' is outside the allowed roots"
)
def _resolve_tool_path_in_workspace(workspace: str, raw_path: str) -> str:
"""Confine a model-supplied path to the active workspace.
Layered on top of upstream's path policy: the workspace is the allowed
root (relative paths resolve under it; paths that escape it are rejected),
and the sensitive-file deny list (.ssh, .gnupg, id_rsa, …) still applies
inside it. When no workspace is set, callers use _resolve_tool_path (the
default data/tmp allowlist) instead.
"""
if raw_path is None or not str(raw_path).strip():
raise ValueError("path is required")
base = os.path.realpath(workspace)
expanded = os.path.expanduser(str(raw_path).strip())
candidate = expanded if os.path.isabs(expanded) else os.path.join(base, expanded)
resolved = os.path.realpath(candidate)
if _is_sensitive_path(resolved):
raise ValueError(
f"path '{raw_path}' is inside a sensitive directory "
f"(e.g. .ssh, .gnupg) or matches a sensitive filename"
)
if resolved != base:
# normcase so containment holds on case-insensitive filesystems
# (Windows, default macOS): it lowercases on Windows and is a no-op on
# POSIX. commonpath raises ValueError across Windows drives (C: vs D:)
# or mixed abs/rel — both mean "outside", so the except rejects them.
nbase = os.path.normcase(base)
try:
if os.path.commonpath([os.path.normcase(resolved), nbase]) != nbase:
raise ValueError
except ValueError:
raise ValueError(f"path '{raw_path}' is outside the workspace ({workspace})")
return resolved
# Bash + python tools used to share a single 60s timeout. That's
# enough for one-shot commands but starves real workloads (pip
# install, ffmpeg conversions, etc.) — and worse, the agent saw the
@@ -310,14 +347,19 @@ _CODENAV_MAX_HITS = 200
_CODENAV_MAX_LINE = 400
def _resolve_search_root(raw_path: str) -> str:
def _resolve_search_root(raw_path: str, workspace: Optional[str] = None) -> str:
"""Resolve + confine a code-nav path (grep/glob/ls).
Empty path → the agent's primary root (first allowlisted root, i.e. the
project data dir). A supplied path is confined by the same allowlist +
sensitive-file policy as read_file (_resolve_tool_path).
With a workspace set, the workspace folder is the root and supplied paths are
confined inside it (same policy as read_file). Without one, an empty path
defaults to the agent's primary root (project data dir) and a supplied path
is confined by the global allowlist + sensitive-file policy.
"""
raw = (raw_path or "").strip()
if workspace:
if not raw:
return os.path.realpath(workspace)
return _resolve_tool_path_in_workspace(workspace, raw)
if not raw:
roots = _tool_path_roots()
return roots[0] if roots else os.path.realpath(".")
@@ -534,11 +576,12 @@ async def _call_mcp_tool(
tool: str,
content: str,
progress_cb: Optional[Callable[[Dict], Awaitable[None]]] = None,
workspace: Optional[str] = None,
) -> Dict:
"""Route a legacy tool call through the MCP manager, with direct fallbacks."""
mcp = get_mcp_manager()
if not mcp:
return await _direct_fallback(tool, content, progress_cb=progress_cb) or {"error": f"MCP manager not available for tool '{tool}'", "exit_code": 1}
return await _direct_fallback(tool, content, progress_cb=progress_cb, workspace=workspace) or {"error": f"MCP manager not available for tool '{tool}'", "exit_code": 1}
server_id, tool_name = _MCP_TOOL_MAP[tool]
qualified = f"mcp__{server_id}__{tool_name}"
@@ -547,7 +590,7 @@ async def _call_mcp_tool(
# If MCP server not connected, try direct fallback
if isinstance(result, dict) and result.get("exit_code") == 1 and "not connected" in result.get("error", ""):
fallback = await _direct_fallback(tool, content, progress_cb=progress_cb)
fallback = await _direct_fallback(tool, content, progress_cb=progress_cb, workspace=workspace)
if fallback:
return fallback
@@ -574,6 +617,7 @@ async def _direct_fallback(
tool: str,
content: str,
progress_cb: Optional[Callable[[Dict], Awaitable[None]]] = None,
workspace: Optional[str] = None,
) -> Optional[Dict]:
"""In-process execution path for the eight tools that used to live as
stdio MCP servers under mcp_servers/. Those servers were deleted in
@@ -609,7 +653,7 @@ async def _direct_fallback(
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=_subproc_env,
cwd=_AGENT_WORKDIR,
cwd=workspace or _AGENT_WORKDIR,
)
stdout, stderr, rc, timed_out = await _run_subprocess_streaming(
proc,
@@ -636,7 +680,7 @@ async def _direct_fallback(
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=_subproc_env,
cwd=_AGENT_WORKDIR,
cwd=workspace or _AGENT_WORKDIR,
)
stdout, stderr, rc, timed_out = await _run_subprocess_streaming(
proc,
@@ -666,7 +710,8 @@ async def _direct_fallback(
except (_json.JSONDecodeError, TypeError, ValueError):
pass
try:
path = _resolve_tool_path(raw_path)
path = (_resolve_tool_path_in_workspace(workspace, raw_path)
if workspace else _resolve_tool_path(raw_path))
except ValueError as e:
return {"error": f"read_file: {e}", "exit_code": 1}
try:
@@ -709,7 +754,8 @@ async def _direct_fallback(
raw_path = lines[0].strip()
body = lines[1] if len(lines) > 1 else ""
try:
path = _resolve_tool_path(raw_path)
path = (_resolve_tool_path_in_workspace(workspace, raw_path)
if workspace else _resolve_tool_path(raw_path))
except ValueError as e:
return {"error": f"write_file: {e}", "exit_code": 1}
try:
@@ -762,7 +808,7 @@ async def _direct_fallback(
max_hits = _CODENAV_MAX_HITS
max_hits = max(1, min(max_hits, _CODENAV_MAX_HITS))
try:
root = _resolve_search_root(str(args.get("path", "")))
root = _resolve_search_root(str(args.get("path", "")), workspace)
except ValueError as e:
return {"error": f"grep: {e}", "exit_code": 1}
@@ -846,7 +892,7 @@ async def _direct_fallback(
if not pattern:
return {"error": "glob: pattern is required", "exit_code": 1}
try:
root = _resolve_search_root(str(args.get("path", "")))
root = _resolve_search_root(str(args.get("path", "")), workspace)
except ValueError as e:
return {"error": f"glob: {e}", "exit_code": 1}
@@ -893,7 +939,7 @@ async def _direct_fallback(
else:
raw_path = _s.split("\n", 1)[0].strip()
try:
root = _resolve_search_root(raw_path)
root = _resolve_search_root(raw_path, workspace)
except ValueError as e:
return {"error": f"ls: {e}", "exit_code": 1}
@@ -1057,6 +1103,7 @@ async def execute_tool_block(
disabled_tools: Optional[set] = None,
owner: Optional[str] = None,
progress_cb: Optional[Callable[[Dict], Awaitable[None]]] = None,
workspace: Optional[str] = None,
) -> Tuple[str, Dict]:
"""Execute a single tool block. Returns (description, result_dict).
@@ -1144,7 +1191,7 @@ async def execute_tool_block(
_is_bg, _bg_cmd = _split_bg_marker(content)
if _is_bg and _bg_cmd:
from src import bg_jobs
rec = bg_jobs.launch(_bg_cmd, session_id=session_id)
rec = bg_jobs.launch(_bg_cmd, session_id=session_id, cwd=workspace or _AGENT_WORKDIR)
short = _bg_cmd.strip().split(chr(10))[0][:80]
desc = f"bash (background): {short}"
result = {
@@ -1166,12 +1213,13 @@ async def execute_tool_block(
if tool in _MCP_TOOL_MAP:
first_line = content.split(chr(10))[0][:80]
desc = f"{tool}: {first_line}"
result = await _call_mcp_tool(tool, content, progress_cb=progress_cb)
result = await _call_mcp_tool(tool, content, progress_cb=progress_cb, workspace=workspace)
elif tool in ("grep", "glob", "ls"):
# Code-navigation tools — no MCP server; run the direct implementation.
# Confined to the workspace when one is set (same policy as read_file).
first_line = content.split(chr(10))[0][:80]
desc = f"{tool}: {first_line}"
result = await _direct_fallback(tool, content, progress_cb=progress_cb) \
result = await _direct_fallback(tool, content, progress_cb=progress_cb, workspace=workspace) \
or {"error": f"{tool}: execution failed", "exit_code": 1}
elif tool == "create_document":
title = content.split("\n")[0].strip()[:60]
@@ -1273,7 +1321,7 @@ async def execute_tool_block(
desc = "edit_image"
result = await do_edit_image(content, owner=owner)
elif tool == "edit_file":
result = await _do_edit_file(content)
result = await _do_edit_file(content, workspace=workspace)
desc = result.get("output") or result.get("error") or "edit_file"
elif tool == "trigger_research":
desc = "trigger_research"