fix(tools): strict path confinement with sensitive-subpath deny list (#1072)
Rework read_file / write_file confinement after review feedback: - Remove $HOME from default allow roots. Only project data/ and system temp dirs are allowed out of the box. - Add a sensitive-subpath deny list (.ssh, .gnupg, shell rc files, .env, .netrc, SSH key filenames). Checked BEFORE allowlist so it blocks even when a broader root is configured. - Add "tool_path_extra_roots" setting for opt-in broader access. - Sensitive subpaths remain blocked regardless of configured roots. Tests: 24 cases covering /etc/shadow, ~/.ssh/authorized_keys, symlink into .ssh, traversal, shell rc files, key filenames, extra roots, and dispatch-level end-to-end.
This commit is contained in:
@@ -97,6 +97,11 @@ DEFAULT_SETTINGS = {
|
|||||||
"agent_max_tool_calls": 0,
|
"agent_max_tool_calls": 0,
|
||||||
"agent_input_token_budget": 6000,
|
"agent_input_token_budget": 6000,
|
||||||
"agent_stream_timeout_seconds": 300,
|
"agent_stream_timeout_seconds": 300,
|
||||||
|
# Extra directory roots that read_file / write_file may access, in
|
||||||
|
# addition to the built-in project data/ and system temp dirs. Each
|
||||||
|
# entry is an absolute path. Sensitive subpaths (.ssh, .gnupg, shell
|
||||||
|
# rc files, SSH key files) are always blocked regardless of roots.
|
||||||
|
"tool_path_extra_roots": [],
|
||||||
"task_endpoint_id": "",
|
"task_endpoint_id": "",
|
||||||
"task_model": "",
|
"task_model": "",
|
||||||
"default_endpoint_id": "",
|
"default_endpoint_id": "",
|
||||||
|
|||||||
@@ -21,6 +21,143 @@ from src.tool_security import is_public_blocked_tool, owner_is_admin_or_single_u
|
|||||||
MAX_OUTPUT_CHARS = 10_000
|
MAX_OUTPUT_CHARS = 10_000
|
||||||
MAX_READ_CHARS = 20_000
|
MAX_READ_CHARS = 20_000
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Path confinement for read_file / write_file
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# read_file + write_file are admin-only tools, but the path the agent
|
||||||
|
# supplies is model-controlled. Prompt-injection in an admin's chat can
|
||||||
|
# weaponise "read /etc/shadow" or "write ~/.ssh/authorized_keys" without
|
||||||
|
# the admin noticing.
|
||||||
|
#
|
||||||
|
# Policy:
|
||||||
|
# 1. Sensitive-subpath deny list — checked FIRST. Blocks .ssh,
|
||||||
|
# .gnupg, shell rc files, token/env files even if the root above
|
||||||
|
# them is on the allowlist.
|
||||||
|
# 2. Allowlist — only the directories the agent legitimately needs
|
||||||
|
# (project data/, system tmp). $HOME is NOT on the default list.
|
||||||
|
# 3. Opt-in extra roots — admin can add broader roots via the
|
||||||
|
# "tool_path_extra_roots" setting (list of path strings).
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
_SENSITIVE_BASENAMES: set[str] = {
|
||||||
|
".ssh", ".gnupg", ".gitconfig",
|
||||||
|
".bashrc", ".bash_profile", ".bash_logout",
|
||||||
|
".zshrc", ".zprofile", ".zshenv",
|
||||||
|
".profile", ".tcshrc", ".cshrc",
|
||||||
|
".env", ".netrc",
|
||||||
|
}
|
||||||
|
|
||||||
|
_SENSITIVE_FILE_PATTERNS: tuple[str, ...] = (
|
||||||
|
"authorized_keys", "id_rsa", "id_ed25519", "id_ecdsa",
|
||||||
|
"known_hosts",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _is_sensitive_path(resolved: str) -> bool:
|
||||||
|
"""Return True if *resolved* falls under a sensitive directory or
|
||||||
|
matches a sensitive filename — regardless of what root it sits under.
|
||||||
|
"""
|
||||||
|
parts = resolved.split(os.sep)
|
||||||
|
filenames: set[str] = {parts[-1]} if parts else set()
|
||||||
|
|
||||||
|
# Check if any path component is a sensitive directory.
|
||||||
|
for part in parts:
|
||||||
|
if part in _SENSITIVE_BASENAMES:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Check filename against known sensitive files.
|
||||||
|
for pat in _SENSITIVE_FILE_PATTERNS:
|
||||||
|
if pat in filenames:
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def _tool_path_roots() -> list[str]:
|
||||||
|
"""Return the list of directory roots that read_file / write_file
|
||||||
|
may touch. Default: project data/ + system temp dirs. Extra roots
|
||||||
|
are loaded from the ``tool_path_extra_roots`` setting.
|
||||||
|
"""
|
||||||
|
roots: list[str] = []
|
||||||
|
|
||||||
|
# Project data directory — the agent's primary workspace.
|
||||||
|
from src.constants import DATA_DIR
|
||||||
|
roots.append(DATA_DIR)
|
||||||
|
|
||||||
|
# /tmp (and its macOS realpath /private/tmp).
|
||||||
|
roots.append("/tmp")
|
||||||
|
try:
|
||||||
|
private_tmp = os.path.realpath("/tmp")
|
||||||
|
if private_tmp != "/tmp":
|
||||||
|
roots.append(private_tmp)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# $TMPDIR — per-user temp root on macOS (e.g. /var/folders/.../T/).
|
||||||
|
tmpdir = os.environ.get("TMPDIR")
|
||||||
|
if tmpdir:
|
||||||
|
roots.append(tmpdir)
|
||||||
|
|
||||||
|
# Opt-in extra roots from settings.
|
||||||
|
try:
|
||||||
|
from src.settings import get_setting
|
||||||
|
extra = get_setting("tool_path_extra_roots")
|
||||||
|
if isinstance(extra, list):
|
||||||
|
roots.extend(str(r) for r in extra if r)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Deduplicate; resolve symlinks so containment is unambiguous.
|
||||||
|
seen: set[str] = set()
|
||||||
|
out: list[str] = []
|
||||||
|
for r in roots:
|
||||||
|
try:
|
||||||
|
real = os.path.realpath(r)
|
||||||
|
except OSError:
|
||||||
|
continue
|
||||||
|
if real in seen:
|
||||||
|
continue
|
||||||
|
seen.add(real)
|
||||||
|
out.append(real)
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_tool_path(raw_path: str) -> str:
|
||||||
|
"""Resolve and confine a model-supplied path.
|
||||||
|
|
||||||
|
Order of checks:
|
||||||
|
1. Non-empty path.
|
||||||
|
2. Sensitive-subpath deny list (blocks .ssh, .gnupg, etc.
|
||||||
|
even when the root is on the allowlist).
|
||||||
|
3. Allowlist containment (must land under one of the roots).
|
||||||
|
|
||||||
|
Returns the realpath on success. Raises ValueError on rejection.
|
||||||
|
Symlinks are resolved before comparison.
|
||||||
|
"""
|
||||||
|
if raw_path is None or not str(raw_path).strip():
|
||||||
|
raise ValueError("path is required")
|
||||||
|
expanded = os.path.expanduser(str(raw_path).strip())
|
||||||
|
resolved = os.path.realpath(expanded)
|
||||||
|
|
||||||
|
if _is_sensitive_path(resolved):
|
||||||
|
raise ValueError(
|
||||||
|
f"path '{raw_path}' is inside a sensitive directory "
|
||||||
|
f"(e.g. .ssh, .gnupg) or matches a sensitive filename"
|
||||||
|
)
|
||||||
|
|
||||||
|
for root in _tool_path_roots():
|
||||||
|
if resolved == root:
|
||||||
|
return resolved
|
||||||
|
try:
|
||||||
|
common = os.path.commonpath([resolved, root])
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
if common == root:
|
||||||
|
return resolved
|
||||||
|
raise ValueError(
|
||||||
|
f"path '{raw_path}' is outside the allowed roots"
|
||||||
|
)
|
||||||
|
|
||||||
# Bash + python tools used to share a single 60s timeout. That's
|
# Bash + python tools used to share a single 60s timeout. That's
|
||||||
# enough for one-shot commands but starves real workloads (pip
|
# enough for one-shot commands but starves real workloads (pip
|
||||||
# install, ffmpeg conversions, etc.) — and worse, the agent saw the
|
# install, ffmpeg conversions, etc.) — and worse, the agent saw the
|
||||||
@@ -375,9 +512,11 @@ async def _direct_fallback(
|
|||||||
return {"output": output or "(no output)", "exit_code": rc or 0}
|
return {"output": output or "(no output)", "exit_code": rc or 0}
|
||||||
|
|
||||||
if tool == "read_file":
|
if tool == "read_file":
|
||||||
path = os.path.expanduser(content.split("\n", 1)[0].strip())
|
raw_path = content.split("\n", 1)[0].strip()
|
||||||
if not path:
|
try:
|
||||||
return {"error": "read_file: path required", "exit_code": 1}
|
path = _resolve_tool_path(raw_path)
|
||||||
|
except ValueError as e:
|
||||||
|
return {"error": f"read_file: {e}", "exit_code": 1}
|
||||||
try:
|
try:
|
||||||
# Run blocking read in a thread to keep the loop responsive
|
# Run blocking read in a thread to keep the loop responsive
|
||||||
def _read():
|
def _read():
|
||||||
@@ -397,13 +536,14 @@ async def _direct_fallback(
|
|||||||
|
|
||||||
if tool == "write_file":
|
if tool == "write_file":
|
||||||
lines = content.split("\n", 1)
|
lines = content.split("\n", 1)
|
||||||
path = os.path.expanduser(lines[0].strip())
|
raw_path = lines[0].strip()
|
||||||
body = lines[1] if len(lines) > 1 else ""
|
body = lines[1] if len(lines) > 1 else ""
|
||||||
if not path:
|
try:
|
||||||
return {"error": "write_file: path required", "exit_code": 1}
|
path = _resolve_tool_path(raw_path)
|
||||||
|
except ValueError as e:
|
||||||
|
return {"error": f"write_file: {e}", "exit_code": 1}
|
||||||
try:
|
try:
|
||||||
def _write():
|
def _write():
|
||||||
import os
|
|
||||||
d = os.path.dirname(path)
|
d = os.path.dirname(path)
|
||||||
if d:
|
if d:
|
||||||
os.makedirs(d, exist_ok=True)
|
os.makedirs(d, exist_ok=True)
|
||||||
|
|||||||
282
tests/test_tool_path_confinement.py
Normal file
282
tests/test_tool_path_confinement.py
Normal file
@@ -0,0 +1,282 @@
|
|||||||
|
"""Regression tests for read_file / write_file path confinement.
|
||||||
|
|
||||||
|
Covers:
|
||||||
|
- /etc/shadow, /etc/passwd, /var/log — blocked (outside roots)
|
||||||
|
- ~/.ssh/authorized_keys — blocked (sensitive subpath deny list)
|
||||||
|
- Symlink that resolves into .ssh — blocked
|
||||||
|
- Relative traversal (~/../../etc/passwd) — blocked
|
||||||
|
- Shell rc files (.bashrc, .zshrc, .profile) — blocked
|
||||||
|
- SSH key filenames (id_rsa, id_ed25519) — blocked regardless of dir
|
||||||
|
- Legitimate paths under project data/ and /tmp — allowed
|
||||||
|
- Extra roots via tool_path_extra_roots setting — opt-in
|
||||||
|
- Even with $HOME as extra root, sensitive subpaths stay blocked
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from types import SimpleNamespace
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
def _make_block(tool_type, content):
|
||||||
|
return SimpleNamespace(tool_type=tool_type, content=content)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Unit tests on _is_sensitive_path ──────────────────────────────────
|
||||||
|
|
||||||
|
def test_sensitive_ssh_dir():
|
||||||
|
from src.tool_execution import _is_sensitive_path
|
||||||
|
assert _is_sensitive_path("/home/user/.ssh/authorized_keys")
|
||||||
|
assert _is_sensitive_path(os.path.expanduser("~") + "/.ssh/config")
|
||||||
|
|
||||||
|
|
||||||
|
def test_sensitive_gnupg_dir():
|
||||||
|
from src.tool_execution import _is_sensitive_path
|
||||||
|
assert _is_sensitive_path("/home/user/.gnupg/pubring.kbx")
|
||||||
|
|
||||||
|
|
||||||
|
def test_sensitive_shell_rc():
|
||||||
|
from src.tool_execution import _is_sensitive_path
|
||||||
|
assert _is_sensitive_path("/home/user/.bashrc")
|
||||||
|
assert _is_sensitive_path("/home/user/.zshrc")
|
||||||
|
assert _is_sensitive_path("/home/user/.profile")
|
||||||
|
|
||||||
|
|
||||||
|
def test_sensitive_key_filenames():
|
||||||
|
from src.tool_execution import _is_sensitive_path
|
||||||
|
assert _is_sensitive_path("/tmp/id_rsa")
|
||||||
|
assert _is_sensitive_path("/tmp/id_ed25519")
|
||||||
|
assert _is_sensitive_path("/tmp/authorized_keys")
|
||||||
|
|
||||||
|
|
||||||
|
def test_non_sensitive_path():
|
||||||
|
from src.tool_execution import _is_sensitive_path
|
||||||
|
assert not _is_sensitive_path("/tmp/notes.txt")
|
||||||
|
assert not _is_sensitive_path("/home/user/projects/file.py")
|
||||||
|
|
||||||
|
|
||||||
|
# ── Unit tests on _resolve_tool_path ─────────────────────────────────
|
||||||
|
|
||||||
|
def test_blocks_etc_shadow():
|
||||||
|
"""The motivating example: /etc/shadow must be rejected."""
|
||||||
|
from src.tool_execution import _resolve_tool_path
|
||||||
|
with pytest.raises(ValueError, match="outside the allowed roots"):
|
||||||
|
_resolve_tool_path("/etc/shadow")
|
||||||
|
|
||||||
|
|
||||||
|
def test_blocks_etc_passwd():
|
||||||
|
from src.tool_execution import _resolve_tool_path
|
||||||
|
with pytest.raises(ValueError, match="outside the allowed roots"):
|
||||||
|
_resolve_tool_path("/etc/passwd")
|
||||||
|
|
||||||
|
|
||||||
|
def test_blocks_var_log():
|
||||||
|
from src.tool_execution import _resolve_tool_path
|
||||||
|
with pytest.raises(ValueError, match="outside the allowed roots"):
|
||||||
|
_resolve_tool_path("/var/log/system.log")
|
||||||
|
|
||||||
|
|
||||||
|
def test_blocks_ssh_authorized_keys():
|
||||||
|
"""~/.ssh/authorized_keys — blocked by sensitive-subpath deny even
|
||||||
|
though $HOME is NOT a default root (the deny list fires first)."""
|
||||||
|
from src.tool_execution import _resolve_tool_path
|
||||||
|
with pytest.raises(ValueError, match="sensitive directory"):
|
||||||
|
_resolve_tool_path("~/.ssh/authorized_keys")
|
||||||
|
|
||||||
|
|
||||||
|
def test_blocks_ssh_dir_absolute():
|
||||||
|
from src.tool_execution import _resolve_tool_path
|
||||||
|
home = os.path.expanduser("~")
|
||||||
|
with pytest.raises(ValueError, match="sensitive directory"):
|
||||||
|
_resolve_tool_path(os.path.join(home, ".ssh", "config"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_blocks_symlink_into_ssh(tmp_path):
|
||||||
|
"""A symlink under /tmp that points into ~/.ssh must be caught
|
||||||
|
because realpath resolves the link before the deny-list check."""
|
||||||
|
from src.tool_execution import _resolve_tool_path
|
||||||
|
ssh_dir = os.path.join(os.path.expanduser("~"), ".ssh")
|
||||||
|
os.makedirs(ssh_dir, exist_ok=True)
|
||||||
|
link = tmp_path / "ssh_link"
|
||||||
|
try:
|
||||||
|
link.symlink_to(ssh_dir)
|
||||||
|
except OSError:
|
||||||
|
pytest.skip("cannot create symlink")
|
||||||
|
with pytest.raises(ValueError, match="sensitive directory"):
|
||||||
|
_resolve_tool_path(str(link))
|
||||||
|
|
||||||
|
|
||||||
|
def test_blocks_traversal_outside_roots():
|
||||||
|
"""~/../../etc/passwd — after tilde expansion and .. resolution the
|
||||||
|
path lands outside every allowed root."""
|
||||||
|
from src.tool_execution import _resolve_tool_path
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
_resolve_tool_path("~/../../etc/passwd")
|
||||||
|
|
||||||
|
|
||||||
|
def test_blocks_bashrc():
|
||||||
|
from src.tool_execution import _resolve_tool_path
|
||||||
|
with pytest.raises(ValueError, match="sensitive directory"):
|
||||||
|
_resolve_tool_path("~/.bashrc")
|
||||||
|
|
||||||
|
|
||||||
|
def test_blocks_zshrc():
|
||||||
|
from src.tool_execution import _resolve_tool_path
|
||||||
|
with pytest.raises(ValueError, match="sensitive directory"):
|
||||||
|
_resolve_tool_path("~/.zshrc")
|
||||||
|
|
||||||
|
|
||||||
|
def test_blocks_env_file():
|
||||||
|
from src.tool_execution import _resolve_tool_path
|
||||||
|
with pytest.raises(ValueError, match="sensitive directory"):
|
||||||
|
_resolve_tool_path("~/.env")
|
||||||
|
|
||||||
|
|
||||||
|
def test_blocks_netrc():
|
||||||
|
from src.tool_execution import _resolve_tool_path
|
||||||
|
with pytest.raises(ValueError, match="sensitive directory"):
|
||||||
|
_resolve_tool_path("~/.netrc")
|
||||||
|
|
||||||
|
|
||||||
|
def test_allows_project_data(tmp_path):
|
||||||
|
"""Paths under project data/ must resolve cleanly."""
|
||||||
|
from src.tool_execution import _resolve_tool_path
|
||||||
|
from src.constants import DATA_DIR
|
||||||
|
target = os.path.join(DATA_DIR, "test-confinement-ok.txt")
|
||||||
|
os.makedirs(DATA_DIR, exist_ok=True)
|
||||||
|
with open(target, "w") as f:
|
||||||
|
f.write("ok")
|
||||||
|
try:
|
||||||
|
resolved = _resolve_tool_path(target)
|
||||||
|
assert resolved == os.path.realpath(target)
|
||||||
|
finally:
|
||||||
|
os.unlink(target)
|
||||||
|
|
||||||
|
|
||||||
|
def test_allows_tmp(tmp_path):
|
||||||
|
"""Paths under /tmp (or its realpath) must resolve cleanly."""
|
||||||
|
from src.tool_execution import _resolve_tool_path
|
||||||
|
f = tmp_path / "confinement-test.txt"
|
||||||
|
f.write_text("ok")
|
||||||
|
resolved = _resolve_tool_path(str(f))
|
||||||
|
assert resolved == os.path.realpath(str(f))
|
||||||
|
|
||||||
|
|
||||||
|
def test_rejects_empty_path():
|
||||||
|
from src.tool_execution import _resolve_tool_path
|
||||||
|
with pytest.raises(ValueError, match="path is required"):
|
||||||
|
_resolve_tool_path("")
|
||||||
|
with pytest.raises(ValueError, match="path is required"):
|
||||||
|
_resolve_tool_path(" ")
|
||||||
|
|
||||||
|
|
||||||
|
def test_extra_roots_opt_in(tmp_path):
|
||||||
|
"""When tool_path_extra_roots includes a directory, paths under it
|
||||||
|
are allowed (but sensitive subpaths are still blocked)."""
|
||||||
|
from src.tool_execution import _resolve_tool_path
|
||||||
|
extra_dir = tmp_path / "extra_root"
|
||||||
|
extra_dir.mkdir()
|
||||||
|
target = extra_dir / "file.txt"
|
||||||
|
target.write_text("ok")
|
||||||
|
|
||||||
|
with patch("src.settings.get_setting", return_value=[str(extra_dir)]):
|
||||||
|
resolved = _resolve_tool_path(str(target))
|
||||||
|
assert resolved == os.path.realpath(str(target))
|
||||||
|
|
||||||
|
|
||||||
|
def test_extra_root_still_blocks_sensitive(tmp_path):
|
||||||
|
"""Even when $HOME is in tool_path_extra_roots, ~/.ssh/authorized_keys
|
||||||
|
must still be rejected by the sensitive-subpath deny list."""
|
||||||
|
from src.tool_execution import _resolve_tool_path
|
||||||
|
home = os.path.expanduser("~")
|
||||||
|
with patch("src.settings.get_setting", return_value=[home]):
|
||||||
|
with pytest.raises(ValueError, match="sensitive directory"):
|
||||||
|
_resolve_tool_path("~/.ssh/authorized_keys")
|
||||||
|
|
||||||
|
|
||||||
|
# ── Integration: dispatch-level tests ────────────────────────────────
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_read_file_dispatch_blocks_etc_shadow(monkeypatch):
|
||||||
|
"""End-to-end: read_file dispatch must reject /etc/shadow."""
|
||||||
|
auth_mod = sys.modules.get("core.auth")
|
||||||
|
if auth_mod is None:
|
||||||
|
import core.auth as _real_auth
|
||||||
|
auth_mod = _real_auth
|
||||||
|
|
||||||
|
class _AdminAuth:
|
||||||
|
is_configured = True
|
||||||
|
def is_admin(self, username):
|
||||||
|
return True
|
||||||
|
|
||||||
|
monkeypatch.setattr(auth_mod, "AuthManager", lambda: _AdminAuth())
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"src.tool_execution.owner_is_admin_or_single_user",
|
||||||
|
lambda owner: True,
|
||||||
|
)
|
||||||
|
|
||||||
|
from src.tool_execution import execute_tool_block
|
||||||
|
desc, result = await execute_tool_block(
|
||||||
|
_make_block("read_file", "/etc/shadow"),
|
||||||
|
owner="admin-user",
|
||||||
|
)
|
||||||
|
assert "outside the allowed roots" in (result.get("error") or "")
|
||||||
|
assert result.get("exit_code") == 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_write_file_dispatch_blocks_authorized_keys(monkeypatch):
|
||||||
|
"""End-to-end: write_file dispatch must reject ~/.ssh/authorized_keys."""
|
||||||
|
auth_mod = sys.modules.get("core.auth")
|
||||||
|
if auth_mod is None:
|
||||||
|
import core.auth as _real_auth
|
||||||
|
auth_mod = _real_auth
|
||||||
|
|
||||||
|
class _AdminAuth:
|
||||||
|
is_configured = True
|
||||||
|
def is_admin(self, username):
|
||||||
|
return True
|
||||||
|
|
||||||
|
monkeypatch.setattr(auth_mod, "AuthManager", lambda: _AdminAuth())
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"src.tool_execution.owner_is_admin_or_single_user",
|
||||||
|
lambda owner: True,
|
||||||
|
)
|
||||||
|
|
||||||
|
from src.tool_execution import execute_tool_block
|
||||||
|
desc, result = await execute_tool_block(
|
||||||
|
_make_block("write_file", "~/.ssh/authorized_keys\nssh-rsa AAAAB3..."),
|
||||||
|
owner="admin-user",
|
||||||
|
)
|
||||||
|
assert "sensitive directory" in (result.get("error") or "")
|
||||||
|
assert result.get("exit_code") == 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_write_file_dispatch_blocks_cron(monkeypatch):
|
||||||
|
"""End-to-end: write_file to /etc/cron.d must be rejected."""
|
||||||
|
auth_mod = sys.modules.get("core.auth")
|
||||||
|
if auth_mod is None:
|
||||||
|
import core.auth as _real_auth
|
||||||
|
auth_mod = _real_auth
|
||||||
|
|
||||||
|
class _AdminAuth:
|
||||||
|
is_configured = True
|
||||||
|
def is_admin(self, username):
|
||||||
|
return True
|
||||||
|
|
||||||
|
monkeypatch.setattr(auth_mod, "AuthManager", lambda: _AdminAuth())
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"src.tool_execution.owner_is_admin_or_single_user",
|
||||||
|
lambda owner: True,
|
||||||
|
)
|
||||||
|
|
||||||
|
from src.tool_execution import execute_tool_block
|
||||||
|
desc, result = await execute_tool_block(
|
||||||
|
_make_block("write_file", "/etc/cron.d/agent-payload\n* * * * * root /tmp/p\n"),
|
||||||
|
owner="admin-user",
|
||||||
|
)
|
||||||
|
assert "outside the allowed roots" in (result.get("error") or "")
|
||||||
|
assert result.get("exit_code") == 1
|
||||||
Reference in New Issue
Block a user