diff --git a/src/settings.py b/src/settings.py index 5704656..acf29a9 100644 --- a/src/settings.py +++ b/src/settings.py @@ -97,6 +97,11 @@ DEFAULT_SETTINGS = { "agent_max_tool_calls": 0, "agent_input_token_budget": 6000, "agent_stream_timeout_seconds": 300, + # Extra directory roots that read_file / write_file may access, in + # addition to the built-in project data/ and system temp dirs. Each + # entry is an absolute path. Sensitive subpaths (.ssh, .gnupg, shell + # rc files, SSH key files) are always blocked regardless of roots. + "tool_path_extra_roots": [], "task_endpoint_id": "", "task_model": "", "default_endpoint_id": "", diff --git a/src/tool_execution.py b/src/tool_execution.py index 06d46d9..b0e8e2d 100644 --- a/src/tool_execution.py +++ b/src/tool_execution.py @@ -21,6 +21,143 @@ from src.tool_security import is_public_blocked_tool, owner_is_admin_or_single_u MAX_OUTPUT_CHARS = 10_000 MAX_READ_CHARS = 20_000 +# --------------------------------------------------------------------------- +# Path confinement for read_file / write_file +# --------------------------------------------------------------------------- +# read_file + write_file are admin-only tools, but the path the agent +# supplies is model-controlled. Prompt-injection in an admin's chat can +# weaponise "read /etc/shadow" or "write ~/.ssh/authorized_keys" without +# the admin noticing. +# +# Policy: +# 1. Sensitive-subpath deny list — checked FIRST. Blocks .ssh, +# .gnupg, shell rc files, token/env files even if the root above +# them is on the allowlist. +# 2. Allowlist — only the directories the agent legitimately needs +# (project data/, system tmp). $HOME is NOT on the default list. +# 3. Opt-in extra roots — admin can add broader roots via the +# "tool_path_extra_roots" setting (list of path strings). +# --------------------------------------------------------------------------- + +_SENSITIVE_BASENAMES: set[str] = { + ".ssh", ".gnupg", ".gitconfig", + ".bashrc", ".bash_profile", ".bash_logout", + ".zshrc", ".zprofile", ".zshenv", + ".profile", ".tcshrc", ".cshrc", + ".env", ".netrc", +} + +_SENSITIVE_FILE_PATTERNS: tuple[str, ...] = ( + "authorized_keys", "id_rsa", "id_ed25519", "id_ecdsa", + "known_hosts", +) + + +def _is_sensitive_path(resolved: str) -> bool: + """Return True if *resolved* falls under a sensitive directory or + matches a sensitive filename — regardless of what root it sits under. + """ + parts = resolved.split(os.sep) + filenames: set[str] = {parts[-1]} if parts else set() + + # Check if any path component is a sensitive directory. + for part in parts: + if part in _SENSITIVE_BASENAMES: + return True + + # Check filename against known sensitive files. + for pat in _SENSITIVE_FILE_PATTERNS: + if pat in filenames: + return True + + return False + + +def _tool_path_roots() -> list[str]: + """Return the list of directory roots that read_file / write_file + may touch. Default: project data/ + system temp dirs. Extra roots + are loaded from the ``tool_path_extra_roots`` setting. + """ + roots: list[str] = [] + + # Project data directory — the agent's primary workspace. + from src.constants import DATA_DIR + roots.append(DATA_DIR) + + # /tmp (and its macOS realpath /private/tmp). + roots.append("/tmp") + try: + private_tmp = os.path.realpath("/tmp") + if private_tmp != "/tmp": + roots.append(private_tmp) + except OSError: + pass + + # $TMPDIR — per-user temp root on macOS (e.g. /var/folders/.../T/). + tmpdir = os.environ.get("TMPDIR") + if tmpdir: + roots.append(tmpdir) + + # Opt-in extra roots from settings. + try: + from src.settings import get_setting + extra = get_setting("tool_path_extra_roots") + if isinstance(extra, list): + roots.extend(str(r) for r in extra if r) + except Exception: + pass + + # Deduplicate; resolve symlinks so containment is unambiguous. + seen: set[str] = set() + out: list[str] = [] + for r in roots: + try: + real = os.path.realpath(r) + except OSError: + continue + if real in seen: + continue + seen.add(real) + out.append(real) + return out + + +def _resolve_tool_path(raw_path: str) -> str: + """Resolve and confine a model-supplied path. + + Order of checks: + 1. Non-empty path. + 2. Sensitive-subpath deny list (blocks .ssh, .gnupg, etc. + even when the root is on the allowlist). + 3. Allowlist containment (must land under one of the roots). + + Returns the realpath on success. Raises ValueError on rejection. + Symlinks are resolved before comparison. + """ + if raw_path is None or not str(raw_path).strip(): + raise ValueError("path is required") + expanded = os.path.expanduser(str(raw_path).strip()) + resolved = os.path.realpath(expanded) + + if _is_sensitive_path(resolved): + raise ValueError( + f"path '{raw_path}' is inside a sensitive directory " + f"(e.g. .ssh, .gnupg) or matches a sensitive filename" + ) + + for root in _tool_path_roots(): + if resolved == root: + return resolved + try: + common = os.path.commonpath([resolved, root]) + except ValueError: + continue + if common == root: + return resolved + raise ValueError( + f"path '{raw_path}' is outside the allowed roots" + ) + # Bash + python tools used to share a single 60s timeout. That's # enough for one-shot commands but starves real workloads (pip # install, ffmpeg conversions, etc.) — and worse, the agent saw the @@ -375,9 +512,11 @@ async def _direct_fallback( return {"output": output or "(no output)", "exit_code": rc or 0} if tool == "read_file": - path = os.path.expanduser(content.split("\n", 1)[0].strip()) - if not path: - return {"error": "read_file: path required", "exit_code": 1} + raw_path = content.split("\n", 1)[0].strip() + try: + path = _resolve_tool_path(raw_path) + except ValueError as e: + return {"error": f"read_file: {e}", "exit_code": 1} try: # Run blocking read in a thread to keep the loop responsive def _read(): @@ -397,13 +536,14 @@ async def _direct_fallback( if tool == "write_file": lines = content.split("\n", 1) - path = os.path.expanduser(lines[0].strip()) + raw_path = lines[0].strip() body = lines[1] if len(lines) > 1 else "" - if not path: - return {"error": "write_file: path required", "exit_code": 1} + try: + path = _resolve_tool_path(raw_path) + except ValueError as e: + return {"error": f"write_file: {e}", "exit_code": 1} try: def _write(): - import os d = os.path.dirname(path) if d: os.makedirs(d, exist_ok=True) diff --git a/tests/test_tool_path_confinement.py b/tests/test_tool_path_confinement.py new file mode 100644 index 0000000..6288623 --- /dev/null +++ b/tests/test_tool_path_confinement.py @@ -0,0 +1,282 @@ +"""Regression tests for read_file / write_file path confinement. + +Covers: + - /etc/shadow, /etc/passwd, /var/log — blocked (outside roots) + - ~/.ssh/authorized_keys — blocked (sensitive subpath deny list) + - Symlink that resolves into .ssh — blocked + - Relative traversal (~/../../etc/passwd) — blocked + - Shell rc files (.bashrc, .zshrc, .profile) — blocked + - SSH key filenames (id_rsa, id_ed25519) — blocked regardless of dir + - Legitimate paths under project data/ and /tmp — allowed + - Extra roots via tool_path_extra_roots setting — opt-in + - Even with $HOME as extra root, sensitive subpaths stay blocked +""" + +import os +import sys +from types import SimpleNamespace +from unittest.mock import patch + +import pytest + + +def _make_block(tool_type, content): + return SimpleNamespace(tool_type=tool_type, content=content) + + +# ── Unit tests on _is_sensitive_path ────────────────────────────────── + +def test_sensitive_ssh_dir(): + from src.tool_execution import _is_sensitive_path + assert _is_sensitive_path("/home/user/.ssh/authorized_keys") + assert _is_sensitive_path(os.path.expanduser("~") + "/.ssh/config") + + +def test_sensitive_gnupg_dir(): + from src.tool_execution import _is_sensitive_path + assert _is_sensitive_path("/home/user/.gnupg/pubring.kbx") + + +def test_sensitive_shell_rc(): + from src.tool_execution import _is_sensitive_path + assert _is_sensitive_path("/home/user/.bashrc") + assert _is_sensitive_path("/home/user/.zshrc") + assert _is_sensitive_path("/home/user/.profile") + + +def test_sensitive_key_filenames(): + from src.tool_execution import _is_sensitive_path + assert _is_sensitive_path("/tmp/id_rsa") + assert _is_sensitive_path("/tmp/id_ed25519") + assert _is_sensitive_path("/tmp/authorized_keys") + + +def test_non_sensitive_path(): + from src.tool_execution import _is_sensitive_path + assert not _is_sensitive_path("/tmp/notes.txt") + assert not _is_sensitive_path("/home/user/projects/file.py") + + +# ── Unit tests on _resolve_tool_path ───────────────────────────────── + +def test_blocks_etc_shadow(): + """The motivating example: /etc/shadow must be rejected.""" + from src.tool_execution import _resolve_tool_path + with pytest.raises(ValueError, match="outside the allowed roots"): + _resolve_tool_path("/etc/shadow") + + +def test_blocks_etc_passwd(): + from src.tool_execution import _resolve_tool_path + with pytest.raises(ValueError, match="outside the allowed roots"): + _resolve_tool_path("/etc/passwd") + + +def test_blocks_var_log(): + from src.tool_execution import _resolve_tool_path + with pytest.raises(ValueError, match="outside the allowed roots"): + _resolve_tool_path("/var/log/system.log") + + +def test_blocks_ssh_authorized_keys(): + """~/.ssh/authorized_keys — blocked by sensitive-subpath deny even + though $HOME is NOT a default root (the deny list fires first).""" + from src.tool_execution import _resolve_tool_path + with pytest.raises(ValueError, match="sensitive directory"): + _resolve_tool_path("~/.ssh/authorized_keys") + + +def test_blocks_ssh_dir_absolute(): + from src.tool_execution import _resolve_tool_path + home = os.path.expanduser("~") + with pytest.raises(ValueError, match="sensitive directory"): + _resolve_tool_path(os.path.join(home, ".ssh", "config")) + + +def test_blocks_symlink_into_ssh(tmp_path): + """A symlink under /tmp that points into ~/.ssh must be caught + because realpath resolves the link before the deny-list check.""" + from src.tool_execution import _resolve_tool_path + ssh_dir = os.path.join(os.path.expanduser("~"), ".ssh") + os.makedirs(ssh_dir, exist_ok=True) + link = tmp_path / "ssh_link" + try: + link.symlink_to(ssh_dir) + except OSError: + pytest.skip("cannot create symlink") + with pytest.raises(ValueError, match="sensitive directory"): + _resolve_tool_path(str(link)) + + +def test_blocks_traversal_outside_roots(): + """~/../../etc/passwd — after tilde expansion and .. resolution the + path lands outside every allowed root.""" + from src.tool_execution import _resolve_tool_path + with pytest.raises(ValueError): + _resolve_tool_path("~/../../etc/passwd") + + +def test_blocks_bashrc(): + from src.tool_execution import _resolve_tool_path + with pytest.raises(ValueError, match="sensitive directory"): + _resolve_tool_path("~/.bashrc") + + +def test_blocks_zshrc(): + from src.tool_execution import _resolve_tool_path + with pytest.raises(ValueError, match="sensitive directory"): + _resolve_tool_path("~/.zshrc") + + +def test_blocks_env_file(): + from src.tool_execution import _resolve_tool_path + with pytest.raises(ValueError, match="sensitive directory"): + _resolve_tool_path("~/.env") + + +def test_blocks_netrc(): + from src.tool_execution import _resolve_tool_path + with pytest.raises(ValueError, match="sensitive directory"): + _resolve_tool_path("~/.netrc") + + +def test_allows_project_data(tmp_path): + """Paths under project data/ must resolve cleanly.""" + from src.tool_execution import _resolve_tool_path + from src.constants import DATA_DIR + target = os.path.join(DATA_DIR, "test-confinement-ok.txt") + os.makedirs(DATA_DIR, exist_ok=True) + with open(target, "w") as f: + f.write("ok") + try: + resolved = _resolve_tool_path(target) + assert resolved == os.path.realpath(target) + finally: + os.unlink(target) + + +def test_allows_tmp(tmp_path): + """Paths under /tmp (or its realpath) must resolve cleanly.""" + from src.tool_execution import _resolve_tool_path + f = tmp_path / "confinement-test.txt" + f.write_text("ok") + resolved = _resolve_tool_path(str(f)) + assert resolved == os.path.realpath(str(f)) + + +def test_rejects_empty_path(): + from src.tool_execution import _resolve_tool_path + with pytest.raises(ValueError, match="path is required"): + _resolve_tool_path("") + with pytest.raises(ValueError, match="path is required"): + _resolve_tool_path(" ") + + +def test_extra_roots_opt_in(tmp_path): + """When tool_path_extra_roots includes a directory, paths under it + are allowed (but sensitive subpaths are still blocked).""" + from src.tool_execution import _resolve_tool_path + extra_dir = tmp_path / "extra_root" + extra_dir.mkdir() + target = extra_dir / "file.txt" + target.write_text("ok") + + with patch("src.settings.get_setting", return_value=[str(extra_dir)]): + resolved = _resolve_tool_path(str(target)) + assert resolved == os.path.realpath(str(target)) + + +def test_extra_root_still_blocks_sensitive(tmp_path): + """Even when $HOME is in tool_path_extra_roots, ~/.ssh/authorized_keys + must still be rejected by the sensitive-subpath deny list.""" + from src.tool_execution import _resolve_tool_path + home = os.path.expanduser("~") + with patch("src.settings.get_setting", return_value=[home]): + with pytest.raises(ValueError, match="sensitive directory"): + _resolve_tool_path("~/.ssh/authorized_keys") + + +# ── Integration: dispatch-level tests ──────────────────────────────── + +@pytest.mark.asyncio +async def test_read_file_dispatch_blocks_etc_shadow(monkeypatch): + """End-to-end: read_file dispatch must reject /etc/shadow.""" + auth_mod = sys.modules.get("core.auth") + if auth_mod is None: + import core.auth as _real_auth + auth_mod = _real_auth + + class _AdminAuth: + is_configured = True + def is_admin(self, username): + return True + + monkeypatch.setattr(auth_mod, "AuthManager", lambda: _AdminAuth()) + monkeypatch.setattr( + "src.tool_execution.owner_is_admin_or_single_user", + lambda owner: True, + ) + + from src.tool_execution import execute_tool_block + desc, result = await execute_tool_block( + _make_block("read_file", "/etc/shadow"), + owner="admin-user", + ) + assert "outside the allowed roots" in (result.get("error") or "") + assert result.get("exit_code") == 1 + + +@pytest.mark.asyncio +async def test_write_file_dispatch_blocks_authorized_keys(monkeypatch): + """End-to-end: write_file dispatch must reject ~/.ssh/authorized_keys.""" + auth_mod = sys.modules.get("core.auth") + if auth_mod is None: + import core.auth as _real_auth + auth_mod = _real_auth + + class _AdminAuth: + is_configured = True + def is_admin(self, username): + return True + + monkeypatch.setattr(auth_mod, "AuthManager", lambda: _AdminAuth()) + monkeypatch.setattr( + "src.tool_execution.owner_is_admin_or_single_user", + lambda owner: True, + ) + + from src.tool_execution import execute_tool_block + desc, result = await execute_tool_block( + _make_block("write_file", "~/.ssh/authorized_keys\nssh-rsa AAAAB3..."), + owner="admin-user", + ) + assert "sensitive directory" in (result.get("error") or "") + assert result.get("exit_code") == 1 + + +@pytest.mark.asyncio +async def test_write_file_dispatch_blocks_cron(monkeypatch): + """End-to-end: write_file to /etc/cron.d must be rejected.""" + auth_mod = sys.modules.get("core.auth") + if auth_mod is None: + import core.auth as _real_auth + auth_mod = _real_auth + + class _AdminAuth: + is_configured = True + def is_admin(self, username): + return True + + monkeypatch.setattr(auth_mod, "AuthManager", lambda: _AdminAuth()) + monkeypatch.setattr( + "src.tool_execution.owner_is_admin_or_single_user", + lambda owner: True, + ) + + from src.tool_execution import execute_tool_block + desc, result = await execute_tool_block( + _make_block("write_file", "/etc/cron.d/agent-payload\n* * * * * root /tmp/p\n"), + owner="admin-user", + ) + assert "outside the allowed roots" in (result.get("error") or "") + assert result.get("exit_code") == 1