* feat: Add workspace: confine agent tools to a folder Pick a server folder as the agent's workspace so its file/shell tools work there and don't touch files outside it. File tools are hard-confined; bash/ python run with cwd set to the folder. Includes a slash command: `/workspace` (alias `/ws`) — show / `set <path>` / `clear` / `pick` (open the directory browser). - routes/workspace_routes.py: GET /api/workspace/browse (admin-only). - src/tool_execution.py: hard path confinement for read_file/write_file; bash/python cwd. Threaded route → stream_agent_loop → execute_tool_block. - src/agent_loop.py: workspace note prepended to the system prompt. - static/: overflow menu item, input-bar pill, directory-browser modal, and the /workspace slash command. - tests/test_workspace_confine.py. * Wire workspace confinement into tools that landed after this PR edit_file (#1239) and grep/glob/ls (#1670) merged after workspace-confine was written, so they bypassed the workspace boundary. Thread the workspace through: - edit_file: _do_edit_file resolves via _resolve_tool_path_in_workspace - grep/glob/ls: _resolve_search_root confines to the workspace (root + paths) - bash/python/bg cwd: workspace or _AGENT_WORKDIR (keep the #2586 data-dir default when no workspace is set) Tests cover edit_file + grep/ls confinement (inside ok, outside rejected). * Workspace picker: editable path bar + modal style cohesion + cross-platform hardening - Make the current-folder strip an editable address bar: type/paste a full path and press Enter to navigate (also reaches other Windows drives and hidden dirs the up-only browser cannot). - Reuse shared modal CSS: drop bespoke .workspace-modal-content/.workspace-btn* in favour of base .modal-content/.modal-body and the .confirm-btn button family; separators/hover use var(--border). Net -31 CSS lines. - Fix the path field overflowing the modal right edge (flex stretch + margin vs an overflow:auto scrollbar-feedback loop): full-bleed, no h-margin. - Cross-platform confinement: normcase the workspace commonpath check so containment holds on case-insensitive filesystems (Windows/macOS). - Make tests OS-portable: sibling temp dirs instead of /etc, python os.getcwd() instead of pwd. 5 pass.
1440 lines
61 KiB
Python
1440 lines
61 KiB
Python
"""
|
||
tool_execution.py
|
||
|
||
Tool dispatcher and result formatter for the agent loop.
|
||
Routes tool blocks to MCP servers or native implementations.
|
||
|
||
Extracted from agent_tools.py.
|
||
"""
|
||
|
||
import asyncio
|
||
import collections
|
||
import json
|
||
import logging
|
||
import os
|
||
import pathlib
|
||
import sys
|
||
import time
|
||
from typing import Any, Awaitable, Callable, Dict, Optional, Tuple
|
||
|
||
from src.tool_security import is_public_blocked_tool, owner_is_admin_or_single_user
|
||
|
||
# Persistent working directory for agent subprocesses.
|
||
# Resolves to <repo_root>/data, which is the bind-mounted volume in Docker
|
||
# (/app/data) and the local data directory for manual installs.
|
||
# Using this as cwd and HOME prevents the agent from silently creating files
|
||
# in ephemeral container layers that are lost on the next rebuild.
|
||
_AGENT_WORKDIR = str(pathlib.Path(__file__).parent.parent / "data")
|
||
|
||
MAX_OUTPUT_CHARS = 10_000
|
||
MAX_READ_CHARS = 20_000
|
||
MAX_DIFF_LINES = 400 # cap unified-diff size returned to the UI
|
||
|
||
|
||
def _unified_diff(old: str, new: str, path: str) -> Optional[Dict[str, Any]]:
|
||
"""Build a unified diff of a file write for display in the chat.
|
||
|
||
Returns {"text": <unified diff>, "added": N, "removed": M, "new_file": bool}
|
||
or None when there's no textual change. Truncates very large diffs.
|
||
"""
|
||
if old == new:
|
||
return None
|
||
import difflib
|
||
|
||
old_lines = old.splitlines()
|
||
new_lines = new.splitlines()
|
||
label = path or "file"
|
||
diff_lines = list(difflib.unified_diff(
|
||
old_lines, new_lines,
|
||
fromfile=f"a/{label}", tofile=f"b/{label}",
|
||
lineterm="",
|
||
))
|
||
added = sum(1 for l in diff_lines if l.startswith("+") and not l.startswith("+++"))
|
||
removed = sum(1 for l in diff_lines if l.startswith("-") and not l.startswith("---"))
|
||
truncated = False
|
||
if len(diff_lines) > MAX_DIFF_LINES:
|
||
diff_lines = diff_lines[:MAX_DIFF_LINES]
|
||
truncated = True
|
||
text = "\n".join(diff_lines)
|
||
if truncated:
|
||
text += f"\n… diff truncated at {MAX_DIFF_LINES} lines"
|
||
return {
|
||
"text": text,
|
||
"added": added,
|
||
"removed": removed,
|
||
"new_file": old == "",
|
||
"file": os.path.basename(path) or (path or "file"),
|
||
}
|
||
|
||
|
||
async def _do_edit_file(content: str, workspace: Optional[str] = None) -> Dict[str, Any]:
|
||
"""Exact string-replacement edit of an on-disk file.
|
||
|
||
content is JSON: {"path", "old_string", "new_string", "replace_all"?}.
|
||
Fails if old_string is missing or non-unique (unless replace_all) so the
|
||
model can't silently edit the wrong place. Returns a unified diff for the UI.
|
||
Confined to the workspace when one is set (same policy as write_file).
|
||
"""
|
||
try:
|
||
args = json.loads(content) if content.strip().startswith("{") else {}
|
||
except (json.JSONDecodeError, TypeError):
|
||
args = {}
|
||
raw_path = (args.get("path") or "").strip()
|
||
old = args.get("old_string", "")
|
||
new = args.get("new_string", "")
|
||
replace_all = bool(args.get("replace_all", False))
|
||
if not raw_path:
|
||
return {"error": "edit_file: path required", "exit_code": 1}
|
||
# Confine to the workspace when set, else the same allowlist + sensitive-file
|
||
# policy as read/write_file.
|
||
try:
|
||
path = (_resolve_tool_path_in_workspace(workspace, raw_path)
|
||
if workspace else _resolve_tool_path(raw_path))
|
||
except ValueError as e:
|
||
return {"error": f"edit_file: {e}", "exit_code": 1}
|
||
if old == "":
|
||
return {"error": "edit_file: old_string required (use write_file to create a file)", "exit_code": 1}
|
||
if old == new:
|
||
return {"error": "edit_file: old_string and new_string are identical", "exit_code": 1}
|
||
|
||
def _apply():
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
original = f.read()
|
||
count = original.count(old)
|
||
if count == 0:
|
||
return original, None, "not_found"
|
||
if count > 1 and not replace_all:
|
||
return original, None, f"not_unique:{count}"
|
||
updated = original.replace(old, new) if replace_all else original.replace(old, new, 1)
|
||
with open(path, "w", encoding="utf-8") as f:
|
||
f.write(updated)
|
||
return original, updated, "ok"
|
||
|
||
try:
|
||
original, updated, status = await asyncio.to_thread(_apply)
|
||
except FileNotFoundError:
|
||
return {"error": f"edit_file: {path}: not found (use write_file to create it)", "exit_code": 1}
|
||
except (IsADirectoryError, UnicodeDecodeError):
|
||
return {"error": f"edit_file: {path}: not an editable text file", "exit_code": 1}
|
||
except PermissionError:
|
||
return {"error": f"edit_file: {path}: permission denied", "exit_code": 1}
|
||
except OSError as e:
|
||
return {"error": f"edit_file: {path}: {e}", "exit_code": 1}
|
||
|
||
if status == "not_found":
|
||
return {"error": f"edit_file: old_string not found in {path}. Read the file and match it exactly.", "exit_code": 1}
|
||
if status.startswith("not_unique"):
|
||
n = status.split(":", 1)[1]
|
||
return {"error": f"edit_file: old_string is not unique in {path} ({n} matches). Add surrounding context or set replace_all=true.", "exit_code": 1}
|
||
|
||
n = original.count(old)
|
||
result = {"output": f"Edited {path} ({n} replacement{'s' if n != 1 else ''})", "exit_code": 0}
|
||
diff = _unified_diff(original, updated, path)
|
||
if diff:
|
||
result["diff"] = diff
|
||
return result
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Path confinement for read_file / write_file
|
||
# ---------------------------------------------------------------------------
|
||
# read_file + write_file are admin-only tools, but the path the agent
|
||
# supplies is model-controlled. Prompt-injection in an admin's chat can
|
||
# weaponise "read /etc/shadow" or "write ~/.ssh/authorized_keys" without
|
||
# the admin noticing.
|
||
#
|
||
# Policy:
|
||
# 1. Sensitive-subpath deny list — checked FIRST. Blocks .ssh,
|
||
# .gnupg, shell rc files, token/env files even if the root above
|
||
# them is on the allowlist.
|
||
# 2. Allowlist — only the directories the agent legitimately needs
|
||
# (project data/, system tmp). $HOME is NOT on the default list.
|
||
# 3. Opt-in extra roots — admin can add broader roots via the
|
||
# "tool_path_extra_roots" setting (list of path strings).
|
||
# ---------------------------------------------------------------------------
|
||
|
||
_SENSITIVE_BASENAMES: set[str] = {
|
||
".ssh", ".gnupg", ".gitconfig",
|
||
".bashrc", ".bash_profile", ".bash_logout",
|
||
".zshrc", ".zprofile", ".zshenv",
|
||
".profile", ".tcshrc", ".cshrc",
|
||
".env", ".netrc",
|
||
}
|
||
|
||
_SENSITIVE_FILE_PATTERNS: tuple[str, ...] = (
|
||
"authorized_keys", "id_rsa", "id_ed25519", "id_ecdsa",
|
||
"known_hosts",
|
||
)
|
||
|
||
|
||
def _is_sensitive_path(resolved: str) -> bool:
|
||
"""Return True if *resolved* falls under a sensitive directory or
|
||
matches a sensitive filename — regardless of what root it sits under.
|
||
"""
|
||
parts = resolved.split(os.sep)
|
||
filenames: set[str] = {parts[-1]} if parts else set()
|
||
|
||
# Check if any path component is a sensitive directory.
|
||
for part in parts:
|
||
if part in _SENSITIVE_BASENAMES:
|
||
return True
|
||
|
||
# Check filename against known sensitive files.
|
||
for pat in _SENSITIVE_FILE_PATTERNS:
|
||
if pat in filenames:
|
||
return True
|
||
|
||
return False
|
||
|
||
|
||
def _tool_path_roots() -> list[str]:
|
||
"""Return the list of directory roots that read_file / write_file
|
||
may touch. Default: project data/ + system temp dirs. Extra roots
|
||
are loaded from the ``tool_path_extra_roots`` setting.
|
||
"""
|
||
roots: list[str] = []
|
||
|
||
# Project data directory — the agent's primary workspace.
|
||
from src.constants import DATA_DIR
|
||
roots.append(DATA_DIR)
|
||
|
||
# /tmp (and its macOS realpath /private/tmp).
|
||
roots.append("/tmp")
|
||
try:
|
||
private_tmp = os.path.realpath("/tmp")
|
||
if private_tmp != "/tmp":
|
||
roots.append(private_tmp)
|
||
except OSError:
|
||
pass
|
||
|
||
# $TMPDIR — per-user temp root on macOS (e.g. /var/folders/.../T/).
|
||
tmpdir = os.environ.get("TMPDIR")
|
||
if tmpdir:
|
||
roots.append(tmpdir)
|
||
|
||
# Opt-in extra roots from settings.
|
||
try:
|
||
from src.settings import get_setting
|
||
extra = get_setting("tool_path_extra_roots")
|
||
if isinstance(extra, list):
|
||
roots.extend(str(r) for r in extra if r)
|
||
except Exception:
|
||
pass
|
||
|
||
# Deduplicate; resolve symlinks so containment is unambiguous.
|
||
seen: set[str] = set()
|
||
out: list[str] = []
|
||
for r in roots:
|
||
try:
|
||
real = os.path.realpath(r)
|
||
except OSError:
|
||
continue
|
||
if real in seen:
|
||
continue
|
||
seen.add(real)
|
||
out.append(real)
|
||
return out
|
||
|
||
|
||
def _resolve_tool_path(raw_path: str) -> str:
|
||
"""Resolve and confine a model-supplied path.
|
||
|
||
Order of checks:
|
||
1. Non-empty path.
|
||
2. Sensitive-subpath deny list (blocks .ssh, .gnupg, etc.
|
||
even when the root is on the allowlist).
|
||
3. Allowlist containment (must land under one of the roots).
|
||
|
||
Returns the realpath on success. Raises ValueError on rejection.
|
||
Symlinks are resolved before comparison.
|
||
"""
|
||
if raw_path is None or not str(raw_path).strip():
|
||
raise ValueError("path is required")
|
||
expanded = os.path.expanduser(str(raw_path).strip())
|
||
resolved = os.path.realpath(expanded)
|
||
|
||
if _is_sensitive_path(resolved):
|
||
raise ValueError(
|
||
f"path '{raw_path}' is inside a sensitive directory "
|
||
f"(e.g. .ssh, .gnupg) or matches a sensitive filename"
|
||
)
|
||
|
||
for root in _tool_path_roots():
|
||
if resolved == root:
|
||
return resolved
|
||
try:
|
||
common = os.path.commonpath([resolved, root])
|
||
except ValueError:
|
||
continue
|
||
if common == root:
|
||
return resolved
|
||
raise ValueError(
|
||
f"path '{raw_path}' is outside the allowed roots"
|
||
)
|
||
|
||
|
||
def _resolve_tool_path_in_workspace(workspace: str, raw_path: str) -> str:
|
||
"""Confine a model-supplied path to the active workspace.
|
||
|
||
Layered on top of upstream's path policy: the workspace is the allowed
|
||
root (relative paths resolve under it; paths that escape it are rejected),
|
||
and the sensitive-file deny list (.ssh, .gnupg, id_rsa, …) still applies
|
||
inside it. When no workspace is set, callers use _resolve_tool_path (the
|
||
default data/tmp allowlist) instead.
|
||
"""
|
||
if raw_path is None or not str(raw_path).strip():
|
||
raise ValueError("path is required")
|
||
base = os.path.realpath(workspace)
|
||
expanded = os.path.expanduser(str(raw_path).strip())
|
||
candidate = expanded if os.path.isabs(expanded) else os.path.join(base, expanded)
|
||
resolved = os.path.realpath(candidate)
|
||
if _is_sensitive_path(resolved):
|
||
raise ValueError(
|
||
f"path '{raw_path}' is inside a sensitive directory "
|
||
f"(e.g. .ssh, .gnupg) or matches a sensitive filename"
|
||
)
|
||
if resolved != base:
|
||
# normcase so containment holds on case-insensitive filesystems
|
||
# (Windows, default macOS): it lowercases on Windows and is a no-op on
|
||
# POSIX. commonpath raises ValueError across Windows drives (C: vs D:)
|
||
# or mixed abs/rel — both mean "outside", so the except rejects them.
|
||
nbase = os.path.normcase(base)
|
||
try:
|
||
if os.path.commonpath([os.path.normcase(resolved), nbase]) != nbase:
|
||
raise ValueError
|
||
except ValueError:
|
||
raise ValueError(f"path '{raw_path}' is outside the workspace ({workspace})")
|
||
return resolved
|
||
|
||
# Bash + python tools used to share a single 60s timeout. That's
|
||
# enough for one-shot commands but starves real workloads (pip
|
||
# install, ffmpeg conversions, etc.) — and worse, the agent saw the
|
||
# 60s timeout and went silent because it had nothing to report.
|
||
# The new default is intentionally generous: long enough that real
|
||
# work isn't killed mid-flight, but bounded so a runaway process
|
||
# (infinite loop, hung connect, etc.) eventually frees the worker.
|
||
# The user can cancel sooner via the chat stop button — when the
|
||
# SSE stream is torn down, the asyncio task running the subprocess
|
||
# gets cancelled and the subprocess is killed by the finally block.
|
||
DEFAULT_BASH_TIMEOUT = 60 * 60 # 1 hour
|
||
DEFAULT_PYTHON_TIMEOUT = 60 * 60
|
||
|
||
# How often to push a progress event while a long-running subprocess
|
||
# is still in flight. The frontend cares about "alive" more than
|
||
# "every-byte" — 2s is the sweet spot.
|
||
PROGRESS_INTERVAL_S = 2.0
|
||
# Tail buffer size — we keep the most recent N lines of stdout +
|
||
# stderr so the progress event includes a "what's it doing right now"
|
||
# snippet without dragging the whole output along.
|
||
PROGRESS_TAIL_LINES = 12
|
||
|
||
|
||
def get_mcp_manager():
|
||
from src import agent_tools
|
||
return agent_tools.get_mcp_manager()
|
||
|
||
|
||
# Directories ignored by the code-nav tools' Python fallbacks so results aren't
|
||
# polluted by VCS internals / dependency trees / build caches. ripgrep already
|
||
# honours .gitignore; this is the parity floor for the no-rg path (and the
|
||
# explicit excludes passed to rg so it skips them even without a .gitignore).
|
||
_CODENAV_SKIP_DIRS = frozenset({
|
||
".git", ".hg", ".svn", "node_modules", "venv", ".venv", "__pycache__",
|
||
".mypy_cache", ".pytest_cache", ".ruff_cache", "dist", "build",
|
||
".next", ".cache", "site-packages", ".idea", ".tox",
|
||
})
|
||
# Per-tool result caps (keep tool output cheap + model-friendly).
|
||
_CODENAV_MAX_HITS = 200
|
||
_CODENAV_MAX_LINE = 400
|
||
|
||
|
||
def _resolve_search_root(raw_path: str, workspace: Optional[str] = None) -> str:
|
||
"""Resolve + confine a code-nav path (grep/glob/ls).
|
||
|
||
With a workspace set, the workspace folder is the root and supplied paths are
|
||
confined inside it (same policy as read_file). Without one, an empty path
|
||
defaults to the agent's primary root (project data dir) and a supplied path
|
||
is confined by the global allowlist + sensitive-file policy.
|
||
"""
|
||
raw = (raw_path or "").strip()
|
||
if workspace:
|
||
if not raw:
|
||
return os.path.realpath(workspace)
|
||
return _resolve_tool_path_in_workspace(workspace, raw)
|
||
if not raw:
|
||
roots = _tool_path_roots()
|
||
return roots[0] if roots else os.path.realpath(".")
|
||
return _resolve_tool_path(raw)
|
||
|
||
|
||
def _truncate(text: str, limit: int = MAX_OUTPUT_CHARS) -> str:
|
||
if len(text) > limit:
|
||
return text[:limit] + f"\n... (truncated, {len(text)} chars total)"
|
||
return text
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
async def _run_subprocess_streaming(
|
||
proc: asyncio.subprocess.Process,
|
||
*,
|
||
timeout: float,
|
||
progress_cb: Optional[Callable[[Dict], Awaitable[None]]] = None,
|
||
) -> Tuple[str, str, Optional[int], bool]:
|
||
"""Run a subprocess to completion, streaming progress.
|
||
|
||
Reads stdout + stderr line-by-line into ring buffers so a
|
||
periodic progress callback can emit a "tail" of recent output
|
||
without waiting for the full result. Returns
|
||
(full_stdout, full_stderr, return_code, timed_out).
|
||
|
||
`timed_out=True` means the process was killed because it ran
|
||
past `timeout` seconds. Whatever output we'd buffered up to
|
||
that point is still returned.
|
||
"""
|
||
started = time.time()
|
||
stdout_full: list[str] = []
|
||
stderr_full: list[str] = []
|
||
tail = collections.deque(maxlen=PROGRESS_TAIL_LINES)
|
||
|
||
async def _reader(stream, full_buf, label: str):
|
||
if stream is None:
|
||
return
|
||
while True:
|
||
line = await stream.readline()
|
||
if not line:
|
||
break
|
||
decoded = line.decode("utf-8", errors="replace").rstrip("\n")
|
||
full_buf.append(decoded)
|
||
if label == "err":
|
||
tail.append(f"! {decoded}")
|
||
else:
|
||
tail.append(decoded)
|
||
|
||
async def _progress_emitter():
|
||
# Skip the first push — many commands finish well under
|
||
# PROGRESS_INTERVAL_S and a 0-second "progress" event would
|
||
# just add UI churn.
|
||
await asyncio.sleep(PROGRESS_INTERVAL_S)
|
||
while True:
|
||
if progress_cb:
|
||
try:
|
||
await progress_cb({
|
||
"elapsed_s": round(time.time() - started, 1),
|
||
"tail": "\n".join(list(tail)),
|
||
})
|
||
except Exception:
|
||
# Progress is best-effort — never let a UI hiccup
|
||
# break the underlying subprocess.
|
||
pass
|
||
await asyncio.sleep(PROGRESS_INTERVAL_S)
|
||
|
||
rd_out = asyncio.create_task(_reader(proc.stdout, stdout_full, "out"))
|
||
rd_err = asyncio.create_task(_reader(proc.stderr, stderr_full, "err"))
|
||
prog_task = asyncio.create_task(_progress_emitter()) if progress_cb else None
|
||
|
||
timed_out = False
|
||
try:
|
||
await asyncio.wait_for(proc.wait(), timeout=timeout)
|
||
except asyncio.TimeoutError:
|
||
timed_out = True
|
||
try:
|
||
proc.kill()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
await asyncio.wait_for(proc.wait(), timeout=2)
|
||
except Exception:
|
||
pass
|
||
except asyncio.CancelledError:
|
||
# User hit stop / SSE stream torn down. Kill the child so it
|
||
# doesn't keep running orphaned. Re-raise so the agent loop's
|
||
# cancellation propagates as the user expects.
|
||
try:
|
||
proc.kill()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
await asyncio.wait_for(proc.wait(), timeout=2)
|
||
except Exception:
|
||
pass
|
||
# Best-effort: stop the readers + emitter before re-raising.
|
||
for t in (rd_out, rd_err):
|
||
t.cancel()
|
||
if prog_task is not None:
|
||
prog_task.cancel()
|
||
raise
|
||
finally:
|
||
if prog_task is not None and not prog_task.done():
|
||
prog_task.cancel()
|
||
try:
|
||
await prog_task
|
||
except (asyncio.CancelledError, Exception):
|
||
pass
|
||
# Wait for readers to finish draining the pipes.
|
||
for t in (rd_out, rd_err):
|
||
try:
|
||
await asyncio.wait_for(t, timeout=1)
|
||
except Exception:
|
||
pass
|
||
|
||
return (
|
||
"\n".join(stdout_full),
|
||
"\n".join(stderr_full),
|
||
proc.returncode,
|
||
timed_out,
|
||
)
|
||
|
||
_ADMIN_TOOLS = {
|
||
"app_api",
|
||
"manage_endpoints",
|
||
"manage_mcp",
|
||
"manage_webhooks",
|
||
"manage_tokens",
|
||
"manage_settings",
|
||
"download_model",
|
||
"serve_model",
|
||
"serve_preset",
|
||
"stop_served_model",
|
||
"cancel_download",
|
||
}
|
||
|
||
|
||
def _owner_is_admin(owner: Optional[str]) -> bool:
|
||
"""Mirror route-level admin behavior for agent tool execution."""
|
||
return owner_is_admin_or_single_user(owner)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# MCP-backed tool helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# Map legacy tool names -> (MCP server_id, MCP tool_name)
|
||
_MCP_TOOL_MAP = {
|
||
"bash": ("bash", "bash"),
|
||
"python": ("python", "python"),
|
||
"read_file": ("filesystem", "read_file"),
|
||
"write_file": ("filesystem", "write_file"),
|
||
"web_search": ("web_search", "web_search"),
|
||
"web_fetch": ("web_fetch", "web_fetch"),
|
||
"generate_image": ("image_gen", "generate_image"),
|
||
}
|
||
|
||
|
||
def _parse_generate_image(content: str) -> Dict:
|
||
lines = content.strip().split("\n")
|
||
args = {"prompt": lines[0].strip() if lines else ""}
|
||
for i, key in enumerate(["model", "size", "quality"], 1):
|
||
if len(lines) > i and lines[i].strip():
|
||
args[key] = lines[i].strip()
|
||
return args
|
||
|
||
|
||
def _parse_manage_memory(content: str) -> Dict:
|
||
lines = content.strip().split("\n")
|
||
action = lines[0].strip().lower() if lines else ""
|
||
args = {"action": action}
|
||
if action == "add":
|
||
args["text"] = lines[1].strip() if len(lines) > 1 else ""
|
||
if len(lines) > 2 and lines[2].strip():
|
||
args["category"] = lines[2].strip().lower()
|
||
elif action == "edit":
|
||
args["memory_id"] = lines[1].strip() if len(lines) > 1 else ""
|
||
args["text"] = lines[2].strip() if len(lines) > 2 else ""
|
||
elif action == "delete":
|
||
args["memory_id"] = lines[1].strip() if len(lines) > 1 else ""
|
||
elif action == "search":
|
||
args["text"] = lines[1].strip() if len(lines) > 1 else ""
|
||
elif action == "list":
|
||
if len(lines) > 1 and lines[1].strip():
|
||
args["category"] = lines[1].strip().lower()
|
||
return args
|
||
|
||
|
||
def _parse_write_file(content: str) -> Dict:
|
||
lines = content.split("\n", 1)
|
||
return {"path": lines[0].strip(), "content": lines[1] if len(lines) > 1 else ""}
|
||
|
||
|
||
_MCP_ARG_PARSERS: Dict[str, callable] = {
|
||
"bash": lambda c: {"command": c},
|
||
"python": lambda c: {"code": c},
|
||
"web_search": lambda c: {"query": c.split("\n")[0].strip()},
|
||
"web_fetch": lambda c: {"url": c.split("\n")[0].strip()},
|
||
"read_file": lambda c: {"path": c.split("\n")[0].strip()},
|
||
"write_file": _parse_write_file,
|
||
"generate_image": _parse_generate_image,
|
||
"manage_memory": _parse_manage_memory,
|
||
}
|
||
|
||
|
||
def _build_mcp_args(tool: str, content: str) -> Dict:
|
||
"""Convert fenced-block text content to structured MCP arguments."""
|
||
parser = _MCP_ARG_PARSERS.get(tool)
|
||
return parser(content) if parser else {}
|
||
|
||
|
||
async def _call_mcp_tool(
|
||
tool: str,
|
||
content: str,
|
||
progress_cb: Optional[Callable[[Dict], Awaitable[None]]] = None,
|
||
workspace: Optional[str] = None,
|
||
) -> Dict:
|
||
"""Route a legacy tool call through the MCP manager, with direct fallbacks."""
|
||
mcp = get_mcp_manager()
|
||
if not mcp:
|
||
return await _direct_fallback(tool, content, progress_cb=progress_cb, workspace=workspace) or {"error": f"MCP manager not available for tool '{tool}'", "exit_code": 1}
|
||
|
||
server_id, tool_name = _MCP_TOOL_MAP[tool]
|
||
qualified = f"mcp__{server_id}__{tool_name}"
|
||
args = _build_mcp_args(tool, content)
|
||
result = await mcp.call_tool(qualified, args)
|
||
|
||
# If MCP server not connected, try direct fallback
|
||
if isinstance(result, dict) and result.get("exit_code") == 1 and "not connected" in result.get("error", ""):
|
||
fallback = await _direct_fallback(tool, content, progress_cb=progress_cb, workspace=workspace)
|
||
if fallback:
|
||
return fallback
|
||
|
||
return result
|
||
|
||
|
||
_BG_MARKERS = {"#!bg", "#bg", "# bg", "#background", "# background", "@background", "# @background"}
|
||
|
||
|
||
def _split_bg_marker(content: str):
|
||
"""If the bash content's first non-empty line is a background marker
|
||
(e.g. `#!bg`), return (True, command_without_marker); else (False, content)."""
|
||
lines = content.split("\n")
|
||
i = 0
|
||
while i < len(lines) and not lines[i].strip():
|
||
i += 1
|
||
if i < len(lines) and lines[i].strip().lower() in _BG_MARKERS:
|
||
del lines[i]
|
||
return True, "\n".join(lines).strip()
|
||
return False, content
|
||
|
||
|
||
async def _direct_fallback(
|
||
tool: str,
|
||
content: str,
|
||
progress_cb: Optional[Callable[[Dict], Awaitable[None]]] = None,
|
||
workspace: Optional[str] = None,
|
||
) -> Optional[Dict]:
|
||
"""In-process execution path for the eight tools that used to live as
|
||
stdio MCP servers under mcp_servers/. Those servers were deleted in
|
||
favor of native execution; this function is now the canonical path,
|
||
not a fallback. The name is kept for backwards compat with callers.
|
||
|
||
`progress_cb` is called periodically while bash/python subprocesses
|
||
are still running, with `{elapsed_s, tail}` payloads. Other tools
|
||
ignore it.
|
||
"""
|
||
import json as _json
|
||
|
||
# Inherit env + force a sane terminal so subprocesses that touch
|
||
# terminfo (anything calling `clear`, `tput`, `os.system("clear")`,
|
||
# or scripts that probe $TERM) don't spam "TERM environment variable
|
||
# not set" errors. The agent's bash/python tool calls run with PIPE
|
||
# stdin/stdout (no real TTY), so curses/termios still won't work —
|
||
# but at least non-interactive code with incidental TERM lookups
|
||
# stops failing. COLUMNS/LINES give terminal-width-aware tools (less,
|
||
# rich, etc.) reasonable defaults instead of 0×0.
|
||
_subproc_env = {
|
||
**os.environ,
|
||
"TERM": "xterm-256color",
|
||
"COLUMNS": "120",
|
||
"LINES": "40",
|
||
"HOME": _AGENT_WORKDIR,
|
||
}
|
||
|
||
try:
|
||
if tool == "bash":
|
||
proc = await asyncio.create_subprocess_shell(
|
||
content,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
env=_subproc_env,
|
||
cwd=workspace or _AGENT_WORKDIR,
|
||
)
|
||
stdout, stderr, rc, timed_out = await _run_subprocess_streaming(
|
||
proc,
|
||
timeout=DEFAULT_BASH_TIMEOUT,
|
||
progress_cb=progress_cb,
|
||
)
|
||
if timed_out:
|
||
return {"error": f"bash: timed out after {DEFAULT_BASH_TIMEOUT}s — process killed", "exit_code": 124, "stdout": _truncate(stdout, MAX_OUTPUT_CHARS), "stderr": _truncate(stderr, MAX_OUTPUT_CHARS)}
|
||
output = stdout.rstrip()
|
||
err = stderr.rstrip()
|
||
if err:
|
||
output = (output + "\nSTDERR: " + err).strip() if output else "STDERR: " + err
|
||
output = _truncate(output, MAX_OUTPUT_CHARS)
|
||
return {"output": output or "(no output)", "exit_code": rc or 0}
|
||
|
||
if tool == "python":
|
||
# Run user code in a subprocess so an infinite loop or crash
|
||
# can't take the whole server down. -I = isolated mode (skip
|
||
# user site, no PYTHONPATH inheritance) for hygiene.
|
||
proc = await asyncio.create_subprocess_exec(
|
||
# Use the running interpreter — there is no `python3.exe` on
|
||
# Windows, which made the agent's `python` tool fail there.
|
||
(sys.executable or "python"), "-I", "-c", content,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
env=_subproc_env,
|
||
cwd=workspace or _AGENT_WORKDIR,
|
||
)
|
||
stdout, stderr, rc, timed_out = await _run_subprocess_streaming(
|
||
proc,
|
||
timeout=DEFAULT_PYTHON_TIMEOUT,
|
||
progress_cb=progress_cb,
|
||
)
|
||
if timed_out:
|
||
return {"error": f"python: timed out after {DEFAULT_PYTHON_TIMEOUT}s — process killed", "exit_code": 124, "stdout": _truncate(stdout, MAX_OUTPUT_CHARS), "stderr": _truncate(stderr, MAX_OUTPUT_CHARS)}
|
||
output = stdout.rstrip()
|
||
err = stderr.rstrip()
|
||
if err:
|
||
output = (output + "\nSTDERR: " + err).strip() if output else "STDERR: " + err
|
||
output = _truncate(output, MAX_OUTPUT_CHARS)
|
||
return {"output": output or "(no output)", "exit_code": rc or 0}
|
||
|
||
if tool == "read_file":
|
||
# Args: plain path on line 1 (back-compat) OR JSON
|
||
# {path, offset?, limit?} where offset/limit are a 1-based line range.
|
||
raw_path, offset, limit = content.split("\n", 1)[0].strip(), 0, 0
|
||
_stripped = content.strip()
|
||
if _stripped.startswith("{"):
|
||
try:
|
||
_a = _json.loads(_stripped)
|
||
raw_path = str(_a.get("path", "")).strip()
|
||
offset = int(_a.get("offset") or 0)
|
||
limit = int(_a.get("limit") or 0)
|
||
except (_json.JSONDecodeError, TypeError, ValueError):
|
||
pass
|
||
try:
|
||
path = (_resolve_tool_path_in_workspace(workspace, raw_path)
|
||
if workspace else _resolve_tool_path(raw_path))
|
||
except ValueError as e:
|
||
return {"error": f"read_file: {e}", "exit_code": 1}
|
||
try:
|
||
# Run blocking read in a thread to keep the loop responsive.
|
||
def _read():
|
||
if offset > 0 or limit > 0:
|
||
# Line-range read: slice [offset, offset+limit).
|
||
start = max(offset, 1)
|
||
out, n, budget = [], 0, MAX_READ_CHARS
|
||
with open(path, "r", encoding="utf-8", errors="replace") as f:
|
||
for i, line in enumerate(f, 1):
|
||
if i < start:
|
||
continue
|
||
if limit > 0 and n >= limit:
|
||
break
|
||
out.append(line)
|
||
n += 1
|
||
budget -= len(line)
|
||
if budget <= 0:
|
||
out.append(f"\n... [truncated at {MAX_READ_CHARS} chars]")
|
||
break
|
||
return "".join(out)
|
||
with open(path, "r", encoding="utf-8", errors="replace") as f:
|
||
return f.read(MAX_READ_CHARS + 1)
|
||
data = await asyncio.to_thread(_read)
|
||
except FileNotFoundError:
|
||
return {"error": f"read_file: {path}: not found", "exit_code": 1}
|
||
except PermissionError:
|
||
return {"error": f"read_file: {path}: permission denied", "exit_code": 1}
|
||
except IsADirectoryError:
|
||
return {"error": f"read_file: {path}: is a directory (use ls)", "exit_code": 1}
|
||
except OSError as e:
|
||
return {"error": f"read_file: {path}: {e}", "exit_code": 1}
|
||
if not (offset > 0 or limit > 0) and len(data) > MAX_READ_CHARS:
|
||
data = data[:MAX_READ_CHARS] + f"\n... [truncated at {MAX_READ_CHARS} chars]"
|
||
return {"output": data, "exit_code": 0}
|
||
|
||
if tool == "write_file":
|
||
lines = content.split("\n", 1)
|
||
raw_path = lines[0].strip()
|
||
body = lines[1] if len(lines) > 1 else ""
|
||
try:
|
||
path = (_resolve_tool_path_in_workspace(workspace, raw_path)
|
||
if workspace else _resolve_tool_path(raw_path))
|
||
except ValueError as e:
|
||
return {"error": f"write_file: {e}", "exit_code": 1}
|
||
try:
|
||
def _write():
|
||
# Capture prior content (best-effort, text) so we can show a
|
||
# before/after diff. Missing/binary file → treat as empty.
|
||
old = ""
|
||
try:
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
old = f.read()
|
||
except (FileNotFoundError, IsADirectoryError, UnicodeDecodeError, OSError):
|
||
old = ""
|
||
d = os.path.dirname(path)
|
||
if d:
|
||
os.makedirs(d, exist_ok=True)
|
||
with open(path, "w", encoding="utf-8") as f:
|
||
f.write(body)
|
||
return old, len(body)
|
||
old_content, size = await asyncio.to_thread(_write)
|
||
except PermissionError:
|
||
return {"error": f"write_file: {path}: permission denied", "exit_code": 1}
|
||
except OSError as e:
|
||
return {"error": f"write_file: {path}: {e}", "exit_code": 1}
|
||
diff = _unified_diff(old_content, body, path)
|
||
result = {"output": f"Wrote {size} bytes to {path}", "exit_code": 0}
|
||
if diff:
|
||
result["diff"] = diff
|
||
return result
|
||
|
||
if tool == "grep":
|
||
# Args (JSON): {pattern, path?, glob?, ignore_case?, max_results?}.
|
||
# Bare string → treated as the pattern.
|
||
args: Dict[str, Any] = {}
|
||
_s = (content or "").strip()
|
||
if _s.startswith("{"):
|
||
try:
|
||
args = _json.loads(_s)
|
||
except _json.JSONDecodeError:
|
||
args = {}
|
||
else:
|
||
args = {"pattern": _s}
|
||
pattern = str(args.get("pattern", "")).strip()
|
||
if not pattern:
|
||
return {"error": "grep: pattern is required", "exit_code": 1}
|
||
ignore_case = bool(args.get("ignore_case"))
|
||
glob_pat = str(args.get("glob", "") or "").strip()
|
||
try:
|
||
max_hits = int(args.get("max_results") or _CODENAV_MAX_HITS)
|
||
except (TypeError, ValueError):
|
||
max_hits = _CODENAV_MAX_HITS
|
||
max_hits = max(1, min(max_hits, _CODENAV_MAX_HITS))
|
||
try:
|
||
root = _resolve_search_root(str(args.get("path", "")), workspace)
|
||
except ValueError as e:
|
||
return {"error": f"grep: {e}", "exit_code": 1}
|
||
|
||
def _grep():
|
||
import re as _re
|
||
import shutil
|
||
rg = shutil.which("rg")
|
||
if rg:
|
||
cmd = [rg, "--line-number", "--no-heading", "--color=never",
|
||
"--max-count", str(max_hits)]
|
||
if ignore_case:
|
||
cmd.append("--ignore-case")
|
||
if glob_pat:
|
||
cmd += ["--glob", glob_pat]
|
||
# Exclude junk dirs even when the tree has no .gitignore, so
|
||
# results match the Python fallback's skip set.
|
||
for _d in _CODENAV_SKIP_DIRS:
|
||
cmd += ["--glob", f"!**/{_d}/**"]
|
||
cmd += ["--regexp", pattern, root]
|
||
try:
|
||
import subprocess
|
||
p = subprocess.run(cmd, capture_output=True, text=True, timeout=20)
|
||
lines = [ln for ln in (p.stdout or "").splitlines() if ln][:max_hits]
|
||
return lines, None
|
||
except subprocess.TimeoutExpired:
|
||
return None, "grep: timed out"
|
||
except Exception as _e:
|
||
return None, f"grep: {_e}"
|
||
# Python fallback (no ripgrep): walk + regex.
|
||
try:
|
||
rx = _re.compile(pattern, _re.IGNORECASE if ignore_case else 0)
|
||
except _re.error as _e:
|
||
return None, f"grep: bad pattern: {_e}"
|
||
import fnmatch
|
||
hits = []
|
||
if os.path.isfile(root):
|
||
file_iter = [root]
|
||
else:
|
||
file_iter = []
|
||
for dp, dns, fns in os.walk(root):
|
||
dns[:] = [d for d in dns if d not in _CODENAV_SKIP_DIRS]
|
||
for fn in fns:
|
||
if glob_pat and not fnmatch.fnmatch(fn, glob_pat):
|
||
continue
|
||
file_iter.append(os.path.join(dp, fn))
|
||
for fp in file_iter:
|
||
if len(hits) >= max_hits:
|
||
break
|
||
try:
|
||
with open(fp, "r", encoding="utf-8", errors="strict") as f:
|
||
for i, line in enumerate(f, 1):
|
||
if rx.search(line):
|
||
hits.append(f"{fp}:{i}:{line.rstrip()[:_CODENAV_MAX_LINE]}")
|
||
if len(hits) >= max_hits:
|
||
break
|
||
except (UnicodeDecodeError, OSError):
|
||
continue # skip binary / unreadable
|
||
return hits, None
|
||
|
||
lines, err = await asyncio.to_thread(_grep)
|
||
if err:
|
||
return {"error": err, "exit_code": 1}
|
||
if not lines:
|
||
return {"output": f"No matches for {pattern!r} under {root}", "exit_code": 0}
|
||
out = "\n".join(ln[:_CODENAV_MAX_LINE] for ln in lines)
|
||
if len(lines) >= max_hits:
|
||
out += f"\n... [capped at {max_hits} matches]"
|
||
return {"output": _truncate(out), "exit_code": 0}
|
||
|
||
if tool == "glob":
|
||
args = {}
|
||
_s = (content or "").strip()
|
||
if _s.startswith("{"):
|
||
try:
|
||
args = _json.loads(_s)
|
||
except _json.JSONDecodeError:
|
||
args = {}
|
||
else:
|
||
args = {"pattern": _s}
|
||
pattern = str(args.get("pattern", "")).strip()
|
||
if not pattern:
|
||
return {"error": "glob: pattern is required", "exit_code": 1}
|
||
try:
|
||
root = _resolve_search_root(str(args.get("path", "")), workspace)
|
||
except ValueError as e:
|
||
return {"error": f"glob: {e}", "exit_code": 1}
|
||
|
||
def _glob():
|
||
from pathlib import Path
|
||
base = Path(root)
|
||
if not base.is_dir():
|
||
return None, f"glob: {root}: not a directory"
|
||
matched = []
|
||
try:
|
||
for p in base.rglob(pattern):
|
||
if set(p.relative_to(base).parts) & _CODENAV_SKIP_DIRS:
|
||
continue
|
||
try:
|
||
mtime = p.stat().st_mtime
|
||
except OSError:
|
||
mtime = 0
|
||
matched.append((mtime, str(p)))
|
||
if len(matched) > _CODENAV_MAX_HITS * 5:
|
||
break
|
||
except (OSError, ValueError) as _e:
|
||
return None, f"glob: {_e}"
|
||
matched.sort(key=lambda t: t[0], reverse=True) # newest first
|
||
return [pth for _, pth in matched[:_CODENAV_MAX_HITS]], None
|
||
|
||
paths, err = await asyncio.to_thread(_glob)
|
||
if err:
|
||
return {"error": err, "exit_code": 1}
|
||
if not paths:
|
||
return {"output": f"No files matching {pattern!r} under {root}", "exit_code": 0}
|
||
out = "\n".join(paths)
|
||
if len(paths) >= _CODENAV_MAX_HITS:
|
||
out += f"\n... [capped at {_CODENAV_MAX_HITS} files]"
|
||
return {"output": _truncate(out), "exit_code": 0}
|
||
|
||
if tool == "ls":
|
||
raw_path = ""
|
||
_s = (content or "").strip()
|
||
if _s.startswith("{"):
|
||
try:
|
||
raw_path = str(_json.loads(_s).get("path", "")).strip()
|
||
except _json.JSONDecodeError:
|
||
raw_path = ""
|
||
else:
|
||
raw_path = _s.split("\n", 1)[0].strip()
|
||
try:
|
||
root = _resolve_search_root(raw_path, workspace)
|
||
except ValueError as e:
|
||
return {"error": f"ls: {e}", "exit_code": 1}
|
||
|
||
def _ls():
|
||
if not os.path.isdir(root):
|
||
return None, f"ls: {root}: not a directory"
|
||
rows = []
|
||
try:
|
||
with os.scandir(root) as it:
|
||
for entry in it:
|
||
if entry.name.startswith("."):
|
||
continue
|
||
try:
|
||
is_dir = entry.is_dir(follow_symlinks=False)
|
||
size = entry.stat(follow_symlinks=False).st_size if not is_dir else 0
|
||
except OSError:
|
||
continue
|
||
rows.append((is_dir, entry.name, size))
|
||
except (PermissionError, OSError) as _e:
|
||
return None, f"ls: {_e}"
|
||
rows.sort(key=lambda r: (not r[0], r[1].lower())) # dirs first, then name
|
||
lines = [f"{root}:"]
|
||
for is_dir, name, size in rows[:_CODENAV_MAX_HITS]:
|
||
lines.append(f" {name}/" if is_dir else f" {name} ({size} B)")
|
||
if len(rows) > _CODENAV_MAX_HITS:
|
||
lines.append(f" ... [{len(rows) - _CODENAV_MAX_HITS} more]")
|
||
if not rows:
|
||
lines.append(" (empty)")
|
||
return "\n".join(lines), None
|
||
|
||
out, err = await asyncio.to_thread(_ls)
|
||
if err:
|
||
return {"error": err, "exit_code": 1}
|
||
return {"output": _truncate(out), "exit_code": 0}
|
||
|
||
if tool == "web_search":
|
||
from src.search import comprehensive_web_search
|
||
raw = content.strip()
|
||
query = raw
|
||
time_filter = None
|
||
max_pages = 5
|
||
# Allow JSON-shaped args: {"query": "...", "time_filter": "day", "max_pages": 7}
|
||
if raw.startswith("{"):
|
||
try:
|
||
parsed = _json.loads(raw)
|
||
if isinstance(parsed, dict) and "query" in parsed:
|
||
query = str(parsed.get("query", "")).strip()
|
||
tf = parsed.get("time_filter") or parsed.get("freshness")
|
||
if isinstance(tf, str) and tf.lower() in ("day", "week", "month", "year"):
|
||
time_filter = tf.lower()
|
||
mp = parsed.get("max_pages")
|
||
if isinstance(mp, int) and 1 <= mp <= 10:
|
||
max_pages = mp
|
||
except _json.JSONDecodeError:
|
||
pass
|
||
if not query:
|
||
query = raw.split("\n")[0].strip()
|
||
# Auto-detect freshness from query phrasing when not explicit
|
||
if time_filter is None:
|
||
q_lc = query.lower()
|
||
if any(kw in q_lc for kw in ("today", "latest", "breaking", "this morning", "right now", "currently")):
|
||
time_filter = "day"
|
||
elif any(kw in q_lc for kw in ("this week", "past week", "recent news", "last few days")):
|
||
time_filter = "week"
|
||
elif any(kw in q_lc for kw in ("this month", "past month")):
|
||
time_filter = "month"
|
||
elif " news" in q_lc or q_lc.startswith("news ") or q_lc.endswith(" news"):
|
||
time_filter = "week"
|
||
loop = asyncio.get_running_loop()
|
||
text, sources = await asyncio.wait_for(
|
||
loop.run_in_executor(
|
||
None,
|
||
lambda: comprehensive_web_search(
|
||
query,
|
||
max_pages=max_pages,
|
||
time_filter=time_filter,
|
||
return_sources=True,
|
||
),
|
||
),
|
||
timeout=30,
|
||
)
|
||
output = text[:MAX_OUTPUT_CHARS] if len(text) > MAX_OUTPUT_CHARS else text
|
||
if sources:
|
||
output += "\n\n<!-- SOURCES:" + _json.dumps(sources) + " -->"
|
||
return {"output": output, "exit_code": 0}
|
||
|
||
if tool == "web_fetch":
|
||
# Lightweight single-URL fetch. Wraps the SSRF-safe fetcher used
|
||
# by deep research, so private/loopback/metadata addresses are
|
||
# already blocked there.
|
||
from src.search.content import fetch_webpage_content
|
||
raw = content.strip()
|
||
url = ""
|
||
# Accept either a JSON arg ({"url": "..."}) or a plain URL/domain.
|
||
if raw.startswith("{"):
|
||
try:
|
||
parsed = _json.loads(raw)
|
||
if isinstance(parsed, dict):
|
||
url = str(parsed.get("url") or "").strip()
|
||
except _json.JSONDecodeError:
|
||
url = ""
|
||
if not url:
|
||
# Non-JSON (or JSON without a usable url): take the first line
|
||
# only, so a URL followed by commentary still parses.
|
||
url = raw.split("\n")[0].strip()
|
||
# Reject anything that isn't a single bare URL/domain token.
|
||
if not url or url.startswith("{") or any(c in url for c in (" ", "\t", "\n")):
|
||
return {"error": "web_fetch: provide a single URL or domain, e.g. example.com", "exit_code": 1}
|
||
low = url.lower()
|
||
if "://" in low and not low.startswith(("http://", "https://")):
|
||
return {"error": f"web_fetch: unsupported URL scheme (only http/https): {url[:80]}", "exit_code": 1}
|
||
# Accept bare domains like "example.com" by defaulting to https.
|
||
if not low.startswith(("http://", "https://")):
|
||
url = "https://" + url
|
||
loop = asyncio.get_running_loop()
|
||
try:
|
||
result = await asyncio.wait_for(
|
||
loop.run_in_executor(None, lambda: fetch_webpage_content(url, timeout=10)),
|
||
timeout=30,
|
||
)
|
||
except asyncio.TimeoutError:
|
||
return {"error": f"web_fetch: timed out fetching {url}", "exit_code": 1}
|
||
except Exception as e:
|
||
# Direct URL fetches can hit bot protection / auth walls
|
||
# (e.g. eBay 403). Treat that as a tool failure the model can
|
||
# reason around, not an uncaught chat-stream 500.
|
||
return {"error": f"web_fetch: {url}: {e}", "exit_code": 1}
|
||
err = result.get("error")
|
||
text = (result.get("content") or "").strip()
|
||
title = result.get("title") or ""
|
||
|
||
if not text:
|
||
if err:
|
||
return {"error": f"web_fetch: {url}: {err}", "exit_code": 1}
|
||
# No extractable text: non-HTML body, or a pure client-rendered
|
||
# shell. The agent can fall back to the builtin_browser tool.
|
||
return {"error": f"web_fetch: {url}: no readable text content (not HTML, or the page needs JS/login)", "exit_code": 1}
|
||
|
||
header = (f"# {title}\n" if title else "") + f"Source: {url}\n\n"
|
||
output = header + text
|
||
if len(output) > MAX_OUTPUT_CHARS:
|
||
output = output[:MAX_OUTPUT_CHARS] + "\n\n[...truncated]"
|
||
return {"output": output, "exit_code": 0}
|
||
|
||
# manage_memory / generate_image still live as MCP servers
|
||
# (mcp_servers/{memory,image_gen}_server.py); the MCP path above
|
||
# handles them.
|
||
except Exception as e:
|
||
return {"error": f"{tool}: {e}", "exit_code": 1}
|
||
|
||
return None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Dispatcher
|
||
# ---------------------------------------------------------------------------
|
||
|
||
async def execute_tool_block(
|
||
block: Any,
|
||
session_id: Optional[str] = None,
|
||
disabled_tools: Optional[set] = None,
|
||
owner: Optional[str] = None,
|
||
progress_cb: Optional[Callable[[Dict], Awaitable[None]]] = None,
|
||
workspace: Optional[str] = None,
|
||
) -> Tuple[str, Dict]:
|
||
"""Execute a single tool block. Returns (description, result_dict).
|
||
|
||
`progress_cb` is forwarded to long-running subprocess tools
|
||
(bash, python) so the agent loop can emit `tool_progress` SSE
|
||
events while the command is in flight. Ignored by other tools.
|
||
"""
|
||
from src.tool_implementations import (
|
||
do_create_document, do_update_document, do_edit_document,
|
||
do_suggest_document, do_search_chats, do_manage_tasks,
|
||
do_manage_skills, do_api_call, do_manage_endpoints,
|
||
do_manage_mcp, do_manage_webhooks, do_manage_tokens,
|
||
do_manage_documents, do_manage_settings, do_manage_notes,
|
||
do_manage_calendar,
|
||
do_download_model, do_serve_model, do_list_served_models, do_stop_served_model,
|
||
do_list_downloads, do_cancel_download, do_search_hf_models, do_list_cached_models,
|
||
do_list_serve_presets, do_serve_preset, do_adopt_served_model,
|
||
do_list_cookbook_servers,
|
||
do_edit_image, do_trigger_research, do_manage_research, do_resolve_contact,
|
||
do_manage_contact,
|
||
do_vault_search, do_vault_get, do_vault_unlock,
|
||
do_app_api,
|
||
)
|
||
|
||
tool = block.tool_type
|
||
content = block.content
|
||
|
||
# Misformatted tool call detection: model put JSON inside ```python``` (or
|
||
# similar) without naming the tool. Common with MiniMax-style outputs.
|
||
# Return a helpful error so the model retries with the correct format.
|
||
if tool in ("python", "json", "xml") and content.strip().startswith("{") and content.strip().endswith("}"):
|
||
try:
|
||
import json as _json
|
||
parsed = _json.loads(content.strip())
|
||
if isinstance(parsed, dict):
|
||
desc = f"{tool}: misformatted tool call"
|
||
result = {
|
||
"error": (
|
||
f"You wrote a JSON object inside a ```{tool}``` block, but that's not a tool call.\n"
|
||
"To call a tool, use the tool name as the fence tag, e.g.\n"
|
||
"```resolve_contact\n"
|
||
"{\"name\": \"...\"}\n"
|
||
"```\n"
|
||
"or\n"
|
||
"```send_email\n"
|
||
"{\"to\": \"...\", \"subject\": \"...\", \"body\": \"...\"}\n"
|
||
"```"
|
||
),
|
||
"exit_code": 1,
|
||
}
|
||
return desc, result
|
||
except (ValueError, TypeError):
|
||
pass
|
||
|
||
# Reject tools that the user has disabled for this request
|
||
if disabled_tools and tool in disabled_tools:
|
||
desc = f"{tool}: BLOCKED"
|
||
result = {"error": f"Tool '{tool}' is disabled by user.", "exit_code": 1}
|
||
logger.info(f"Tool blocked by user: {tool}")
|
||
return desc, result
|
||
|
||
if tool in _ADMIN_TOOLS and not _owner_is_admin(owner):
|
||
desc = f"{tool}: BLOCKED"
|
||
result = {"error": f"Tool '{tool}' requires an admin user.", "exit_code": 1}
|
||
logger.warning("Admin tool blocked for non-admin owner=%r tool=%s", owner, tool)
|
||
return desc, result
|
||
|
||
if is_public_blocked_tool(tool) and not _owner_is_admin(owner):
|
||
desc = f"{tool}: BLOCKED"
|
||
result = {
|
||
"error": (
|
||
f"Tool '{tool}' is restricted to admin users on this deployment. "
|
||
"Ask an admin to perform this action or grant the needed permission."
|
||
),
|
||
"exit_code": 1,
|
||
}
|
||
logger.warning("Public tool policy blocked owner=%r tool=%s", owner, tool)
|
||
return desc, result
|
||
|
||
# Background execution: a `bash` block whose first line is the `#!bg`
|
||
# marker runs DETACHED — returns a job id immediately so the chat stream
|
||
# isn't held open for a multi-minute install/ffmpeg/download. The always-on
|
||
# monitor re-invokes the agent with the full output when the job finishes.
|
||
if tool == "bash" and session_id:
|
||
_is_bg, _bg_cmd = _split_bg_marker(content)
|
||
if _is_bg and _bg_cmd:
|
||
from src import bg_jobs
|
||
rec = bg_jobs.launch(_bg_cmd, session_id=session_id, cwd=workspace or _AGENT_WORKDIR)
|
||
short = _bg_cmd.strip().split(chr(10))[0][:80]
|
||
desc = f"bash (background): {short}"
|
||
result = {
|
||
"output": (
|
||
f"Started background job `{rec['id']}`. It is running detached — "
|
||
f"do NOT wait for it or poll it. You will be automatically re-invoked "
|
||
f"with its full output when it finishes. Continue with other work, or "
|
||
f"end your turn now and resume when the result arrives."
|
||
),
|
||
"exit_code": 0,
|
||
"bg_job_id": rec["id"],
|
||
}
|
||
logger.info(f"Tool executed: {desc} -> bg job {rec['id']}")
|
||
return desc, result
|
||
|
||
# Route MCP-extracted tools through the MCP manager. Forward
|
||
# the progress callback so long-running subprocess tools
|
||
# (bash, python) can stream `tool_progress` events to the UI.
|
||
if tool in _MCP_TOOL_MAP:
|
||
first_line = content.split(chr(10))[0][:80]
|
||
desc = f"{tool}: {first_line}"
|
||
result = await _call_mcp_tool(tool, content, progress_cb=progress_cb, workspace=workspace)
|
||
elif tool in ("grep", "glob", "ls"):
|
||
# Code-navigation tools — no MCP server; run the direct implementation.
|
||
# Confined to the workspace when one is set (same policy as read_file).
|
||
first_line = content.split(chr(10))[0][:80]
|
||
desc = f"{tool}: {first_line}"
|
||
result = await _direct_fallback(tool, content, progress_cb=progress_cb, workspace=workspace) \
|
||
or {"error": f"{tool}: execution failed", "exit_code": 1}
|
||
elif tool == "create_document":
|
||
title = content.split("\n")[0].strip()[:60]
|
||
desc = f"create_document: {title}"
|
||
result = await do_create_document(content, session_id=session_id, owner=owner)
|
||
elif tool == "update_document":
|
||
desc = f"update_document: {content.split(chr(10))[0][:60]}"
|
||
result = await do_update_document(content, owner=owner)
|
||
elif tool == "edit_document":
|
||
result = await do_edit_document(content, owner=owner)
|
||
desc = f"edit_document: {result.get('title', '')}"
|
||
elif tool == "suggest_document":
|
||
result = await do_suggest_document(content, owner=owner)
|
||
desc = f"suggest_document: {result.get('count', 0)} suggestions"
|
||
elif tool == "search_chats":
|
||
query = content.split("\n")[0].strip()
|
||
desc = f"search_chats: {query[:80]}"
|
||
result = await do_search_chats(query, owner=owner)
|
||
elif tool in ("chat_with_model", "create_session", "list_sessions",
|
||
"send_to_session", "pipeline",
|
||
"manage_session", "manage_memory", "list_models",
|
||
"ui_control", "ask_teacher"):
|
||
from src.ai_interaction import dispatch_ai_tool
|
||
desc, result = await dispatch_ai_tool(tool, content, session_id, owner=owner)
|
||
elif tool == "manage_tasks":
|
||
desc = "manage_tasks"
|
||
result = await do_manage_tasks(content, owner=owner)
|
||
elif tool == "manage_skills":
|
||
desc = "manage_skills"
|
||
result = await do_manage_skills(content, owner=owner)
|
||
elif tool == "api_call":
|
||
first_line = content.split("\n")[0].strip()[:60]
|
||
desc = f"api_call: {first_line}"
|
||
result = await do_api_call(content)
|
||
elif tool == "manage_endpoints":
|
||
desc = "manage_endpoints"
|
||
result = await do_manage_endpoints(content, owner=owner)
|
||
elif tool == "manage_mcp":
|
||
desc = "manage_mcp"
|
||
result = await do_manage_mcp(content, owner=owner)
|
||
elif tool == "manage_webhooks":
|
||
desc = "manage_webhooks"
|
||
result = await do_manage_webhooks(content, owner=owner)
|
||
elif tool == "manage_tokens":
|
||
desc = "manage_tokens"
|
||
result = await do_manage_tokens(content, owner=owner)
|
||
elif tool == "manage_documents":
|
||
desc = "manage_documents"
|
||
result = await do_manage_documents(content, owner=owner)
|
||
elif tool == "manage_settings":
|
||
desc = "manage_settings"
|
||
result = await do_manage_settings(content, owner=owner)
|
||
elif tool == "manage_notes":
|
||
desc = "manage_notes"
|
||
result = await do_manage_notes(content, owner=owner)
|
||
elif tool == "manage_calendar":
|
||
desc = "manage_calendar"
|
||
result = await do_manage_calendar(content, owner=owner)
|
||
elif tool == "download_model":
|
||
desc = "download_model"
|
||
result = await do_download_model(content, owner=owner)
|
||
elif tool == "serve_model":
|
||
desc = "serve_model"
|
||
result = await do_serve_model(content, owner=owner)
|
||
elif tool == "list_served_models":
|
||
desc = "list_served_models"
|
||
result = await do_list_served_models(content, owner=owner)
|
||
elif tool == "stop_served_model":
|
||
desc = "stop_served_model"
|
||
result = await do_stop_served_model(content, owner=owner)
|
||
elif tool == "list_downloads":
|
||
desc = "list_downloads"
|
||
result = await do_list_downloads(content, owner=owner)
|
||
elif tool == "cancel_download":
|
||
desc = "cancel_download"
|
||
result = await do_cancel_download(content, owner=owner)
|
||
elif tool == "search_hf_models":
|
||
desc = "search_hf_models"
|
||
result = await do_search_hf_models(content, owner=owner)
|
||
elif tool == "list_cached_models":
|
||
desc = "list_cached_models"
|
||
result = await do_list_cached_models(content, owner=owner)
|
||
elif tool == "app_api":
|
||
desc = "app_api"
|
||
result = await do_app_api(content, owner=owner)
|
||
elif tool == "list_serve_presets":
|
||
desc = "list_serve_presets"
|
||
result = await do_list_serve_presets(content, owner=owner)
|
||
elif tool == "serve_preset":
|
||
desc = "serve_preset"
|
||
result = await do_serve_preset(content, owner=owner)
|
||
elif tool == "adopt_served_model":
|
||
desc = "adopt_served_model"
|
||
result = await do_adopt_served_model(content, owner=owner)
|
||
elif tool == "list_cookbook_servers":
|
||
desc = "list_cookbook_servers"
|
||
result = await do_list_cookbook_servers(content, owner=owner)
|
||
elif tool == "edit_image":
|
||
desc = "edit_image"
|
||
result = await do_edit_image(content, owner=owner)
|
||
elif tool == "edit_file":
|
||
result = await _do_edit_file(content, workspace=workspace)
|
||
desc = result.get("output") or result.get("error") or "edit_file"
|
||
elif tool == "trigger_research":
|
||
desc = "trigger_research"
|
||
result = await do_trigger_research(content, owner=owner)
|
||
elif tool == "manage_research":
|
||
desc = "manage_research"
|
||
result = await do_manage_research(content, owner=owner)
|
||
elif tool == "resolve_contact":
|
||
desc = "resolve_contact"
|
||
result = await do_resolve_contact(content, owner=owner)
|
||
elif tool == "manage_contact":
|
||
desc = "manage_contact"
|
||
result = await do_manage_contact(content, owner=owner)
|
||
elif tool == "vault_search":
|
||
desc = "vault_search"
|
||
result = await do_vault_search(content, owner=owner)
|
||
elif tool == "vault_get":
|
||
desc = "vault_get"
|
||
result = await do_vault_get(content, owner=owner)
|
||
elif tool == "vault_unlock":
|
||
desc = "vault_unlock"
|
||
result = await do_vault_unlock(content, owner=owner)
|
||
elif tool.startswith("mcp__"):
|
||
# MCP tool dispatch
|
||
mcp = get_mcp_manager()
|
||
if mcp:
|
||
try:
|
||
args = json.loads(content) if content.strip().startswith("{") else {}
|
||
except (json.JSONDecodeError, TypeError):
|
||
args = {}
|
||
desc = f"mcp: {tool}"
|
||
result = await mcp.call_tool(tool, args)
|
||
else:
|
||
desc = f"mcp: {tool}"
|
||
result = {"error": "MCP manager not available", "exit_code": 1}
|
||
else:
|
||
desc = f"unknown: {tool}"
|
||
result = {"error": f"Unknown tool type: {tool}", "exit_code": 1}
|
||
|
||
logger.info(f"Tool executed: {desc} -> exit_code={result.get('exit_code', 'n/a')}")
|
||
return desc, result
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Result formatting
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# Keys handled by the dedicated branches below — never echo them as raw JSON.
|
||
_FORMATTER_HANDLED_KEYS = {
|
||
"stdout", "stderr", "exit_code", "content", "size",
|
||
"response", "results", "session_id", "name", "model", "session_name",
|
||
"success", "path", "action", "title", "doc_id", "version", "applied",
|
||
"error", "output",
|
||
}
|
||
|
||
|
||
def format_tool_result(description: str, result: Dict) -> str:
|
||
"""Format a tool result into text for feeding back to the LLM."""
|
||
parts = [f"### {description}"]
|
||
|
||
if "stdout" in result:
|
||
if result["stdout"]:
|
||
parts.append(f"**stdout:**\n```\n{result['stdout']}\n```")
|
||
if result["stderr"]:
|
||
parts.append(f"**stderr:**\n```\n{result['stderr']}\n```")
|
||
parts.append(f"**exit_code:** {result.get('exit_code', 'unknown')}")
|
||
elif "output" in result:
|
||
# bash / python canonical result shape: {"output": ..., "exit_code": ...}
|
||
parts.append(f"```\n{result['output']}\n```")
|
||
if result.get("exit_code") not in (0, None):
|
||
parts.append(f"**exit_code:** {result['exit_code']}")
|
||
elif "content" in result:
|
||
parts.append(f"**content ({result.get('size', '?')} chars):**\n```\n{result['content']}\n```")
|
||
elif "response" in result:
|
||
model = result.get("model", result.get("session_name", ""))
|
||
if model:
|
||
parts.append(f"**{model} responded:**\n{result['response']}")
|
||
else:
|
||
parts.append(result["response"])
|
||
elif "results" in result:
|
||
parts.append(result["results"])
|
||
elif "session_id" in result and "name" in result:
|
||
parts.append(f"Session created: **{result['name']}** (id: `{result['session_id']}`, model: {result.get('model', 'unknown')})")
|
||
elif "success" in result:
|
||
if result["success"]:
|
||
parts.append(f"File written: {result['path']} ({result['size']} bytes)")
|
||
else:
|
||
parts.append(f"Error: {result.get('error', 'unknown')}")
|
||
elif "action" in result:
|
||
action = result["action"]
|
||
if action == "create":
|
||
parts.append(f"Document created: \"{result.get('title', '')}\" (id: {result['doc_id']}, v{result['version']})")
|
||
elif action == "update":
|
||
parts.append(f"Document updated: \"{result.get('title', '')}\" (v{result['version']})")
|
||
elif action == "edit":
|
||
parts.append(f'Document edited: "{result.get("title", "")}" (v{result.get("version", "?")}, {result.get("applied", 0)} edit(s) applied)')
|
||
elif "error" in result:
|
||
parts.append(f"**Error:** {result['error']}")
|
||
|
||
# Surface any additional structured payload (events, tasks, notes, calendars,
|
||
# documents, attachments, etc.) that the dedicated branches above don't show.
|
||
# Without this, tools that return {"response": "...", "events": [...]} would
|
||
# silently drop the events list and the model would only see the summary line.
|
||
extra = {k: v for k, v in result.items() if k not in _FORMATTER_HANDLED_KEYS}
|
||
if extra:
|
||
try:
|
||
extra_json = json.dumps(extra, indent=2, default=str, ensure_ascii=False)
|
||
# Cap to avoid blowing the context window on huge payloads.
|
||
if len(extra_json) > 8000:
|
||
extra_json = extra_json[:8000] + f"\n... (truncated, {len(extra_json)} chars total)"
|
||
parts.append(f"**data:**\n```json\n{extra_json}\n```")
|
||
except (TypeError, ValueError):
|
||
pass
|
||
|
||
return "\n".join(parts)
|