Add SSRF-guarded web fetch agent tool

* feat(web-fetch): add web_fetch tool to read a specific URL's content

* test(web-fetch): add SSRF coverage and fail closed on empty DNS resolution

Add explicit SSRF regression tests for the web_fetch path covering
loopback, private LAN ranges, link-local/metadata, IPv6 private/local,
redirect-into-private, and unsupported schemes. Harden _public_http_url
to fail closed when a hostname resolves to no addresses.
This commit is contained in:
Rifqi Akram
2026-06-01 14:57:28 +07:00
committed by GitHub
parent 92c2392fd6
commit 5b1e56407b
11 changed files with 192 additions and 10 deletions

View File

@@ -389,6 +389,7 @@ def setup_chat_routes(
disabled_tools.add("bash")
if str(allow_web_search).lower() != "true":
disabled_tools.add("web_search")
disabled_tools.add("web_fetch")
# Nobody/incognito mode: deny tools that would expose the user's
# persistent memory, past chats, or other identity-linked data.
@@ -452,7 +453,7 @@ def setup_chat_routes(
disabled_tools.update(_compare_strip)
# In chat mode compare, disable ALL agent tools (no bash, python, file ops)
if chat_mode == 'chat':
disabled_tools.update({"bash", "python", "read_file", "write_file", "web_search", "search_chats", "manage_tasks"})
disabled_tools.update({"bash", "python", "read_file", "write_file", "web_search", "web_fetch", "search_chats", "manage_tasks"})
async def stream_with_save() -> AsyncGenerator[str, None]:
# _effective_mode is read-only here; closure captures it from

View File

@@ -199,6 +199,12 @@ Or with JSON for fresh news:
```
Search the web for a SINGLE quick fact/lookup mid-task. For news / "today" / "latest" queries, pass `time_filter` ("day", "week", "month", or "year"). NOT for "research X" / "do research on X" / "look into X" requests — those mean a multi-source DEEP RESEARCH job: use `trigger_research` instead (it runs in the Deep Research sidebar and produces a full report). web_search = one quick query; trigger_research = a researched report.""",
"web_fetch": """\
```web_fetch
<url or domain>
```
Fetch and read the text content of a SPECIFIC URL the user names (e.g. "check example.com", "what does this page say <url>"). A bare domain like `example.com` works (defaults to https). Use this when you already have a concrete URL. For open-ended lookups use `web_search`, and for "research X" jobs use `trigger_research`.""",
"read_file": """\
```read_file
<file path>

View File

@@ -26,7 +26,7 @@ MAX_OUTPUT_CHARS = 10_000
MAX_READ_CHARS = 20_000
# Tool types that trigger execution
TOOL_TAGS = {"bash", "python", "web_search", "read_file", "write_file",
TOOL_TAGS = {"bash", "python", "web_search", "web_fetch", "read_file", "write_file",
"create_document", "update_document", "edit_document",
"search_chats",
"chat_with_model", "create_session", "list_sessions",

View File

@@ -1,5 +1,6 @@
"""Webpage content fetching with caching, PDF extraction, and summarization helpers."""
import copy
import io
import ipaddress
import json
@@ -61,9 +62,12 @@ def _public_http_url(url: str) -> bool:
except ValueError:
pass
try:
return all(not _is_private_address(ip) for ip in _resolve_hostname_ips(host))
ips = _resolve_hostname_ips(host)
except OSError:
return False
# Fail closed: a hostname that resolves to nothing is treated as
# non-public (an empty all(...) would otherwise return True).
return bool(ips) and all(not _is_private_address(ip) for ip in ips)
def _get_public_url(url: str, *, headers: dict, timeout: int) -> httpx.Response:
@@ -297,7 +301,8 @@ def fetch_webpage_content(url: str, timeout: int = 5, retry_attempt: int = 0) ->
js_rendered = _detect_js_frameworks(soup)
js_message = "Page appears to be rendered by a JavaScript framework; content may be incomplete." if js_rendered else ""
# Main textual content (heuristic)
# Main textual content (heuristic): prefer semantic / "content"-classed
# containers to skip nav/footer/boilerplate; tuned for article pages.
main_content = ""
content_areas = soup.find_all(
["main", "article", "section", "div"],
@@ -306,12 +311,29 @@ def fetch_webpage_content(url: str, timeout: int = 5, retry_attempt: int = 0) ->
if content_areas:
for area in content_areas[:3]:
main_content += area.get_text(separator=" ", strip=True) + " "
if not main_content:
main_content = re.sub(r"\s+", " ", main_content).strip()
# The class heuristic can latch onto a small wrapper and miss the real
# content (app/landing pages, or SSR sites whose body isn't in a
# "content"-classed div, so these came back nearly empty before). When the
# heuristic returns nothing OR suspiciously little, fall back to the full
# <body>, stripping scripts/styles (so JSON/JS doesn't leak into the text)
# plus nav/header/footer/aside (boilerplate), and keep whichever yields
# more readable text.
THIN_CONTENT_CHARS = 600 # below this the heuristic likely missed the page
if len(main_content) < THIN_CONTENT_CHARS:
body = soup.find("body")
if body:
main_content = body.get_text(separator=" ", strip=True)
main_content = re.sub(r"\s+", " ", main_content).strip()
# Strip from a copy so the later list/table/code extractors still
# see the original soup unmodified.
body_copy = copy.copy(body)
for _noise in body_copy.find_all(
["script", "style", "noscript", "template", "nav", "header", "footer", "aside"]
):
_noise.extract()
body_text = re.sub(r"\s+", " ", body_copy.get_text(separator=" ", strip=True)).strip()
if len(body_text) > len(main_content):
main_content = body_text
result = {
"url": url,

View File

@@ -122,6 +122,7 @@ DEFAULT_SETTINGS = {
DEFAULT_FEATURES = {
"web_search": True,
"web_fetch": True,
"deep_research": False,
"memory": True,
"document_editor": True,

View File

@@ -2059,7 +2059,7 @@ class TaskScheduler:
"manage_calendar", "manage_notes", "manage_tasks", "manage_memory",
"list_email_accounts", "list_emails", "read_email", "send_email", "reply_to_email", "archive_email",
"mark_email_read", "delete_email", "resolve_contact",
"search_chats", "web_search", "read_file",
"search_chats", "web_search", "web_fetch", "read_file",
"create_document", "update_document", "edit_document",
"generate_image", "trigger_research",
"download_model", "serve_model", "list_served_models", "stop_served_model",

View File

@@ -195,6 +195,7 @@ _MCP_TOOL_MAP = {
"read_file": ("filesystem", "read_file"),
"write_file": ("filesystem", "write_file"),
"web_search": ("web_search", "web_search"),
"web_fetch": ("web_fetch", "web_fetch"),
"generate_image": ("image_gen", "generate_image"),
}
@@ -238,6 +239,7 @@ _MCP_ARG_PARSERS: Dict[str, callable] = {
"bash": lambda c: {"command": c},
"python": lambda c: {"code": c},
"web_search": lambda c: {"query": c.split("\n")[0].strip()},
"web_fetch": lambda c: {"url": c.split("\n")[0].strip()},
"read_file": lambda c: {"path": c.split("\n")[0].strip()},
"write_file": _parse_write_file,
"generate_image": _parse_generate_image,
@@ -464,6 +466,59 @@ async def _direct_fallback(
output += "\n\n<!-- SOURCES:" + _json.dumps(sources) + " -->"
return {"output": output, "exit_code": 0}
if tool == "web_fetch":
# Lightweight single-URL fetch. Wraps the SSRF-safe fetcher used
# by deep research, so private/loopback/metadata addresses are
# already blocked there.
from src.search.content import fetch_webpage_content
raw = content.strip()
url = ""
# Accept either a JSON arg ({"url": "..."}) or a plain URL/domain.
if raw.startswith("{"):
try:
parsed = _json.loads(raw)
if isinstance(parsed, dict):
url = str(parsed.get("url") or "").strip()
except _json.JSONDecodeError:
url = ""
if not url:
# Non-JSON (or JSON without a usable url): take the first line
# only, so a URL followed by commentary still parses.
url = raw.split("\n")[0].strip()
# Reject anything that isn't a single bare URL/domain token.
if not url or url.startswith("{") or any(c in url for c in (" ", "\t", "\n")):
return {"error": "web_fetch: provide a single URL or domain, e.g. example.com", "exit_code": 1}
low = url.lower()
if "://" in low and not low.startswith(("http://", "https://")):
return {"error": f"web_fetch: unsupported URL scheme (only http/https): {url[:80]}", "exit_code": 1}
# Accept bare domains like "example.com" by defaulting to https.
if not low.startswith(("http://", "https://")):
url = "https://" + url
loop = asyncio.get_running_loop()
try:
result = await asyncio.wait_for(
loop.run_in_executor(None, lambda: fetch_webpage_content(url, timeout=10)),
timeout=30,
)
except asyncio.TimeoutError:
return {"error": f"web_fetch: timed out fetching {url}", "exit_code": 1}
err = result.get("error")
text = (result.get("content") or "").strip()
title = result.get("title") or ""
if not text:
if err:
return {"error": f"web_fetch: {url}: {err}", "exit_code": 1}
# No extractable text: non-HTML body, or a pure client-rendered
# shell. The agent can fall back to the builtin_browser tool.
return {"error": f"web_fetch: {url}: no readable text content (not HTML, or the page needs JS/login)", "exit_code": 1}
header = (f"# {title}\n" if title else "") + f"Source: {url}\n\n"
output = header + text
if len(output) > MAX_OUTPUT_CHARS:
output = output[:MAX_OUTPUT_CHARS] + "\n\n[...truncated]"
return {"output": output, "exit_code": 0}
# manage_memory / generate_image still live as MCP servers
# (mcp_servers/{memory,image_gen}_server.py); the MCP path above
# handles them.

View File

@@ -22,7 +22,7 @@ logger = logging.getLogger(__name__)
# Tools that are ALWAYS included regardless of retrieval results.
# These are the most commonly needed and should never be missing.
ALWAYS_AVAILABLE = frozenset({
"bash", "python", "web_search", "read_file",
"bash", "python", "web_search", "web_fetch", "read_file",
"api_call", # For configured integrations (Miniflux, Gitea, Linkding, etc.)
# The two genuinely AMBIENT cookbook tools — "what's running" and
# "kill it" can be asked any time without prior cookbook context,
@@ -62,6 +62,7 @@ BUILTIN_TOOL_DESCRIPTIONS: Dict[str, str] = {
"bash": "Run shell commands on the server. Install packages, check files, git operations, curl, system info, process management, networking.",
"python": "Execute Python code for computation, data processing, math, scripting, parsing, API calls. Not for writing code for the user.",
"web_search": "Quick single web lookup for a fact, current event, or doc mid-task. NOT for 'research X' / 'do research on X' requests — those are deep-research jobs (use trigger_research). web_search = one query; trigger_research = a full researched report in the sidebar.",
"web_fetch": "Fetch and read the text content of a specific URL/website the user names (e.g. 'check example.com', 'open this link'). Use when you have a concrete URL; for open-ended lookups use web_search instead.",
"read_file": "Read a file from disk and return its contents. View source code, config files, logs.",
"write_file": "Write content to a file on disk. Create new files, save output, update configs.",
"create_document": "Create a new document in the editor panel. For code, articles, text content longer than 15 lines. Specify title, language, and content.",

View File

@@ -95,6 +95,10 @@ _TOOL_NAME_MAP = {
"search": "web_search",
"web_search": "web_search",
"websearch": "web_search",
"web_fetch": "web_fetch",
"webfetch": "web_fetch",
"fetch_url": "web_fetch",
"fetch": "web_fetch",
"read": "read_file",
"read_file": "read_file",
"cat": "read_file",
@@ -305,6 +309,8 @@ def _parse_tool_code_block(raw: str) -> Optional[ToolBlock]:
content = xml_params.get("code", args_body)
elif mapped == "web_search":
content = xml_params.get("query", args_body)
elif mapped == "web_fetch":
content = xml_params.get("url", args_body)
elif mapped in ("read_file", "write_file"):
content = xml_params.get("path", xml_params.get("file_path", args_body))
else:

View File

@@ -64,6 +64,20 @@ FUNCTION_TOOL_SCHEMAS = [
}
}
},
{
"type": "function",
"function": {
"name": "web_fetch",
"description": "Fetch and read the text content of a specific URL the user names (e.g. 'check example.com', 'what's on this page <url>'). Use when you already have a concrete URL/domain. NOT for open-ended searches (use web_search) or 'research X' jobs (use trigger_research).",
"parameters": {
"type": "object",
"properties": {
"url": {"type": "string", "description": "The URL or domain to fetch (http/https; a bare domain like example.com is fine)"}
},
"required": ["url"]
}
}
},
{
"type": "function",
"function": {

View File

@@ -546,3 +546,79 @@ def test_mcp_config_listing_is_admin_gated():
assert "def list_servers(request: Request):" in src
assert "def list_tools(request: Request):" in src
assert "def list_server_tools(server_id: str, request: Request):" in src
# ── web_fetch SSRF guard (PR #111 merge gate) ───────────────────────
# web_fetch routes every request through src.search.content's
# _public_http_url / _get_public_url, the same SSRF-safe fetcher used by
# web_search and deep research. These pin that the guard blocks every
# private/internal address class plus redirect-into-private and non-http
# schemes, so the new tool can't be turned into an SSRF primitive.
import ipaddress as _ipaddr
import pytest as _pytest
@_pytest.mark.parametrize("url", [
"http://127.0.0.1/", # IPv4 loopback
"http://localhost/", # loopback by name
"http://10.0.0.5/", # private LAN 10/8
"http://172.16.0.1/", # private LAN 172.16/12
"http://192.168.1.1/", # private LAN 192.168/16
"http://169.254.169.254/latest/", # link-local / cloud metadata
"http://metadata.google.internal/", # metadata by name
"http://[::1]/", # IPv6 loopback
"http://[fc00::1]/", # IPv6 unique-local (ULA)
"http://[fe80::1]/", # IPv6 link-local
"file:///etc/passwd", # unsupported scheme
"ftp://example.com/", # unsupported scheme
])
def test_web_fetch_guard_blocks_private_and_bad_schemes(url):
from src.search.content import _public_http_url
assert _public_http_url(url) is False
def test_web_fetch_guard_allows_public_ip():
from src.search.content import _public_http_url
assert _public_http_url("http://93.184.216.34/") is True
def test_web_fetch_guard_blocks_dns_resolving_to_private(monkeypatch):
from src.search import content
monkeypatch.setattr(content, "_resolve_hostname_ips",
lambda host: [_ipaddr.ip_address("10.0.0.5")])
assert content._public_http_url("https://innocent.example/") is False
def test_web_fetch_guard_fails_closed_on_empty_resolution(monkeypatch):
# A hostname that resolves to nothing must be treated as non-public.
from src.search import content
monkeypatch.setattr(content, "_resolve_hostname_ips", lambda host: [])
assert content._public_http_url("https://innocent.example/") is False
def test_web_fetch_guard_blocks_redirect_into_private(monkeypatch):
# A public URL that 302-redirects to an internal address must be blocked
# at the redirect hop, not followed.
import httpx
from src.search import content
monkeypatch.setattr(content, "_resolve_hostname_ips",
lambda host: [_ipaddr.ip_address("93.184.216.34")])
class _Resp:
status_code = 302
headers = {"location": "http://169.254.169.254/latest/meta-data/"}
class _FakeClient:
def __init__(self, *a, **k): pass
def __enter__(self): return self
def __exit__(self, *a): return False
def get(self, url): return _Resp()
monkeypatch.setattr(httpx, "Client", _FakeClient)
with _pytest.raises(httpx.RequestError) as exc:
content._get_public_url("http://public.example/start", headers={}, timeout=5)
assert "non-public" in str(exc.value)