Harden emoji SVG proxy responses (#2842)
This commit is contained in:
@@ -16,7 +16,7 @@ from pathlib import Path
|
|||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
from fastapi import APIRouter
|
from fastapi import APIRouter
|
||||||
from fastapi.responses import FileResponse, Response
|
from fastapi.responses import Response
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -26,12 +26,42 @@ _CACHE_DIR = Path(__file__).resolve().parent.parent / "data" / "emoji_cache"
|
|||||||
_OPENMOJI_BASE = "https://cdn.jsdelivr.net/npm/openmoji@15.0.0/black/svg"
|
_OPENMOJI_BASE = "https://cdn.jsdelivr.net/npm/openmoji@15.0.0/black/svg"
|
||||||
# codepoints like "1f600" or "1f468-200d-1f469-200d-1f467" (lowercase hex, '-' joined)
|
# codepoints like "1f600" or "1f468-200d-1f469-200d-1f467" (lowercase hex, '-' joined)
|
||||||
_CODE_RE = re.compile(r"^[0-9a-f]{2,6}(?:-[0-9a-f]{2,6})*$")
|
_CODE_RE = re.compile(r"^[0-9a-f]{2,6}(?:-[0-9a-f]{2,6})*$")
|
||||||
_SVG_HEADERS = {"Cache-Control": "public, max-age=31536000, immutable"}
|
_MAX_SVG_BYTES = 256 * 1024
|
||||||
|
_BLOCKED_SVG_RE = re.compile(
|
||||||
|
br"<\s*(?:script|foreignObject|iframe|object|embed|image)\b|"
|
||||||
|
br"\bon[a-z0-9_-]+\s*=",
|
||||||
|
re.IGNORECASE,
|
||||||
|
)
|
||||||
|
_EXTERNAL_REF_RE = re.compile(
|
||||||
|
br"\b(?:href|xlink:href)\s*=\s*['\"](?:https?:|//|data:|javascript:)",
|
||||||
|
re.IGNORECASE,
|
||||||
|
)
|
||||||
|
_SVG_SECURITY_HEADERS = {
|
||||||
|
"X-Content-Type-Options": "nosniff",
|
||||||
|
"Content-Security-Policy": "sandbox",
|
||||||
|
"Cross-Origin-Resource-Policy": "same-origin",
|
||||||
|
}
|
||||||
|
_SVG_HEADERS = {
|
||||||
|
"Cache-Control": "public, max-age=31536000, immutable",
|
||||||
|
**_SVG_SECURITY_HEADERS,
|
||||||
|
}
|
||||||
# Returned when a codepoint is unknown/unreachable: an empty (transparent) SVG,
|
# Returned when a codepoint is unknown/unreachable: an empty (transparent) SVG,
|
||||||
# so the CSS mask renders nothing instead of a solid box. Not cached, so a later
|
# so the CSS mask renders nothing instead of a solid box. Not cached, so a later
|
||||||
# request can still pick up the real glyph once the CDN is reachable.
|
# request can still pick up the real glyph once the CDN is reachable.
|
||||||
_BLANK_SVG = b'<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 1 1"></svg>'
|
_BLANK_SVG = b'<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 1 1"></svg>'
|
||||||
_BLANK_HEADERS = {"Cache-Control": "no-store"}
|
_BLANK_HEADERS = {"Cache-Control": "no-store", **_SVG_SECURITY_HEADERS}
|
||||||
|
|
||||||
|
|
||||||
|
def _is_safe_svg(content: bytes) -> bool:
|
||||||
|
if not isinstance(content, bytes) or not content:
|
||||||
|
return False
|
||||||
|
if len(content) > _MAX_SVG_BYTES:
|
||||||
|
return False
|
||||||
|
if b"<svg" not in content[:256].lower():
|
||||||
|
return False
|
||||||
|
if _BLOCKED_SVG_RE.search(content) or _EXTERNAL_REF_RE.search(content):
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
def setup_emoji_routes() -> APIRouter:
|
def setup_emoji_routes() -> APIRouter:
|
||||||
@@ -49,14 +79,21 @@ def setup_emoji_routes() -> APIRouter:
|
|||||||
_CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
_CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
||||||
fp = _CACHE_DIR / f"{code}.svg"
|
fp = _CACHE_DIR / f"{code}.svg"
|
||||||
if fp.exists():
|
if fp.exists():
|
||||||
return FileResponse(fp, media_type="image/svg+xml", headers=_SVG_HEADERS)
|
try:
|
||||||
|
content = fp.read_bytes()
|
||||||
|
if _is_safe_svg(content):
|
||||||
|
return Response(content, media_type="image/svg+xml", headers=_SVG_HEADERS)
|
||||||
|
fp.unlink(missing_ok=True)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("emoji cache read %s failed: %s", code, e)
|
||||||
|
return _blank()
|
||||||
|
|
||||||
# First time we've seen this emoji — fetch the OpenMoji black SVG + cache
|
# First time we've seen this emoji — fetch the OpenMoji black SVG + cache
|
||||||
# it. OpenMoji filenames are the codepoints uppercased.
|
# it. OpenMoji filenames are the codepoints uppercased.
|
||||||
try:
|
try:
|
||||||
async with httpx.AsyncClient(timeout=8.0) as client:
|
async with httpx.AsyncClient(timeout=8.0) as client:
|
||||||
r = await client.get(f"{_OPENMOJI_BASE}/{code.upper()}.svg")
|
r = await client.get(f"{_OPENMOJI_BASE}/{code.upper()}.svg")
|
||||||
if r.status_code == 200 and b"<svg" in r.content[:256]:
|
if r.status_code == 200 and _is_safe_svg(r.content):
|
||||||
try:
|
try:
|
||||||
fp.write_bytes(r.content)
|
fp.write_bytes(r.content)
|
||||||
except Exception:
|
except Exception:
|
||||||
|
|||||||
54
tests/test_emoji_svg_hardening.py
Normal file
54
tests/test_emoji_svg_hardening.py
Normal file
@@ -0,0 +1,54 @@
|
|||||||
|
import asyncio
|
||||||
|
|
||||||
|
from routes import emoji_routes
|
||||||
|
|
||||||
|
|
||||||
|
def _emoji_endpoint():
|
||||||
|
router = emoji_routes.setup_emoji_routes()
|
||||||
|
for route in router.routes:
|
||||||
|
if route.path == "/api/emoji/{code}.svg" and "GET" in route.methods:
|
||||||
|
return route.endpoint
|
||||||
|
raise AssertionError("emoji route not found")
|
||||||
|
|
||||||
|
|
||||||
|
def test_svg_safety_rejects_active_or_external_svg_content():
|
||||||
|
assert emoji_routes._is_safe_svg(
|
||||||
|
b'<svg xmlns="http://www.w3.org/2000/svg"><path d="M0 0"/></svg>'
|
||||||
|
)
|
||||||
|
|
||||||
|
assert not emoji_routes._is_safe_svg(b'<svg><script>alert(1)</script></svg>')
|
||||||
|
assert not emoji_routes._is_safe_svg(b'<svg onload="alert(1)"></svg>')
|
||||||
|
assert not emoji_routes._is_safe_svg(b'<svg><image href="https://example.com/x.png"/></svg>')
|
||||||
|
assert not emoji_routes._is_safe_svg(b"<svg>" + b"a" * (emoji_routes._MAX_SVG_BYTES + 1))
|
||||||
|
|
||||||
|
|
||||||
|
def test_cached_svg_served_with_security_headers(tmp_path, monkeypatch):
|
||||||
|
cache_dir = tmp_path / "emoji"
|
||||||
|
cache_dir.mkdir()
|
||||||
|
monkeypatch.setattr(emoji_routes, "_CACHE_DIR", cache_dir)
|
||||||
|
content = b'<svg xmlns="http://www.w3.org/2000/svg"><path d="M0 0"/></svg>'
|
||||||
|
(cache_dir / "1f600.svg").write_bytes(content)
|
||||||
|
|
||||||
|
response = asyncio.run(_emoji_endpoint()("1f600"))
|
||||||
|
|
||||||
|
assert response.body == content
|
||||||
|
assert response.headers["cache-control"] == "public, max-age=31536000, immutable"
|
||||||
|
assert response.headers["x-content-type-options"] == "nosniff"
|
||||||
|
assert response.headers["content-security-policy"] == "sandbox"
|
||||||
|
assert response.headers["cross-origin-resource-policy"] == "same-origin"
|
||||||
|
|
||||||
|
|
||||||
|
def test_cached_active_svg_returns_blank_and_evicts_cache(tmp_path, monkeypatch):
|
||||||
|
cache_dir = tmp_path / "emoji"
|
||||||
|
cache_dir.mkdir()
|
||||||
|
monkeypatch.setattr(emoji_routes, "_CACHE_DIR", cache_dir)
|
||||||
|
cached = cache_dir / "1f600.svg"
|
||||||
|
cached.write_bytes(b'<svg onload="alert(1)"></svg>')
|
||||||
|
|
||||||
|
response = asyncio.run(_emoji_endpoint()("1f600"))
|
||||||
|
|
||||||
|
assert response.body == emoji_routes._BLANK_SVG
|
||||||
|
assert response.headers["cache-control"] == "no-store"
|
||||||
|
assert response.headers["x-content-type-options"] == "nosniff"
|
||||||
|
assert response.headers["content-security-policy"] == "sandbox"
|
||||||
|
assert not cached.exists()
|
||||||
Reference in New Issue
Block a user