Constrain gallery filenames to image root (#2828)
This commit is contained in:
@@ -27,11 +27,32 @@ GALLERY_TRANSFORM_UPLOAD_MAX_BYTES = int(os.getenv("ODYSSEUS_GALLERY_TRANSFORM_U
|
|||||||
|
|
||||||
def _sanitize_gallery_filename(filename: str) -> str:
|
def _sanitize_gallery_filename(filename: str) -> str:
|
||||||
"""Return a local filename safe to join under generated_images."""
|
"""Return a local filename safe to join under generated_images."""
|
||||||
safe_name = re.sub(r"[^A-Za-z0-9._-]", "_", Path(filename or "").name)[:128]
|
safe_name = re.sub(r"[^A-Za-z0-9._-]", "_", Path(str(filename or "")).name)[:128]
|
||||||
if not safe_name or safe_name in {".", ".."}:
|
if not safe_name or safe_name in {".", ".."}:
|
||||||
safe_name = uuid.uuid4().hex[:12]
|
safe_name = uuid.uuid4().hex[:12]
|
||||||
return safe_name
|
return safe_name
|
||||||
|
|
||||||
|
|
||||||
|
GALLERY_IMAGE_DIR = Path("data/generated_images")
|
||||||
|
|
||||||
|
|
||||||
|
def _gallery_image_path(filename: str) -> Path:
|
||||||
|
"""Resolve a stored gallery filename without leaving generated_images."""
|
||||||
|
if not isinstance(filename, str):
|
||||||
|
raise HTTPException(400, "Unsafe gallery filename")
|
||||||
|
safe_name = _sanitize_gallery_filename(filename)
|
||||||
|
original = str(filename or "")
|
||||||
|
root = GALLERY_IMAGE_DIR.resolve()
|
||||||
|
path = (GALLERY_IMAGE_DIR / safe_name).resolve()
|
||||||
|
try:
|
||||||
|
if os.path.commonpath([str(root), str(path)]) != str(root):
|
||||||
|
raise ValueError
|
||||||
|
except Exception:
|
||||||
|
raise HTTPException(400, "Unsafe gallery filename")
|
||||||
|
if safe_name != original:
|
||||||
|
raise HTTPException(400, "Unsafe gallery filename")
|
||||||
|
return path
|
||||||
|
|
||||||
def setup_gallery_routes() -> APIRouter:
|
def setup_gallery_routes() -> APIRouter:
|
||||||
router = APIRouter(tags=["gallery"])
|
router = APIRouter(tags=["gallery"])
|
||||||
|
|
||||||
@@ -211,7 +232,7 @@ def setup_gallery_routes() -> APIRouter:
|
|||||||
if not user or img.owner != user:
|
if not user or img.owner != user:
|
||||||
raise HTTPException(403, "Not your image")
|
raise HTTPException(403, "Not your image")
|
||||||
|
|
||||||
img_path = Path("data/generated_images") / img.filename
|
img_path = _gallery_image_path(img.filename)
|
||||||
if not img_path.exists():
|
if not img_path.exists():
|
||||||
raise HTTPException(404, "Image file not found")
|
raise HTTPException(404, "Image file not found")
|
||||||
|
|
||||||
@@ -692,11 +713,11 @@ def setup_gallery_routes() -> APIRouter:
|
|||||||
used = set()
|
used = set()
|
||||||
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
|
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
|
||||||
for img in imgs:
|
for img in imgs:
|
||||||
src = os.path.join("data", "generated_images", img.filename)
|
src = _gallery_image_path(img.filename)
|
||||||
if not os.path.exists(src):
|
if not src.exists():
|
||||||
continue
|
continue
|
||||||
ext = os.path.splitext(img.filename)[1] or ".png"
|
ext = src.suffix or ".png"
|
||||||
base = (img.prompt or "").strip() or os.path.splitext(img.filename)[0]
|
base = (img.prompt or "").strip() or src.stem
|
||||||
base = re.sub(r"[^\w\-. ]+", "", base)[:60].strip() or img.id
|
base = re.sub(r"[^\w\-. ]+", "", base)[:60].strip() or img.id
|
||||||
name = f"{base}{ext}"
|
name = f"{base}{ext}"
|
||||||
i = 1
|
i = 1
|
||||||
@@ -818,9 +839,9 @@ def setup_gallery_routes() -> APIRouter:
|
|||||||
|
|
||||||
img_filename = img.filename
|
img_filename = img.filename
|
||||||
# Remove the file from disk
|
# Remove the file from disk
|
||||||
img_path = os.path.join("data", "generated_images", img_filename)
|
img_path = _gallery_image_path(img_filename)
|
||||||
if os.path.exists(img_path):
|
if img_path.exists():
|
||||||
os.remove(img_path)
|
img_path.unlink()
|
||||||
|
|
||||||
# Soft-delete the record
|
# Soft-delete the record
|
||||||
img.is_active = False
|
img.is_active = False
|
||||||
@@ -1708,7 +1729,7 @@ def setup_gallery_routes() -> APIRouter:
|
|||||||
try:
|
try:
|
||||||
img = _get_or_404_image(db, image_id, user)
|
img = _get_or_404_image(db, image_id, user)
|
||||||
|
|
||||||
img_path = Path("data/generated_images") / img.filename
|
img_path = _gallery_image_path(img.filename)
|
||||||
if not img_path.exists():
|
if not img_path.exists():
|
||||||
raise HTTPException(404, "Image file not found")
|
raise HTTPException(404, "Image file not found")
|
||||||
|
|
||||||
@@ -1807,4 +1828,3 @@ def setup_gallery_routes() -> APIRouter:
|
|||||||
db.close()
|
db.close()
|
||||||
|
|
||||||
return router
|
return router
|
||||||
|
|
||||||
|
|||||||
63
tests/test_gallery_filename_confinement.py
Normal file
63
tests/test_gallery_filename_confinement.py
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from fastapi import HTTPException
|
||||||
|
|
||||||
|
|
||||||
|
def _gallery_module():
|
||||||
|
import routes.gallery_routes as gallery_routes
|
||||||
|
return gallery_routes
|
||||||
|
|
||||||
|
|
||||||
|
def test_gallery_image_path_allows_safe_filename(tmp_path, monkeypatch):
|
||||||
|
gallery_routes = _gallery_module()
|
||||||
|
image_dir = tmp_path / "generated_images"
|
||||||
|
image_dir.mkdir()
|
||||||
|
monkeypatch.setattr(gallery_routes, "GALLERY_IMAGE_DIR", image_dir)
|
||||||
|
|
||||||
|
path = gallery_routes._gallery_image_path("abc123.png")
|
||||||
|
|
||||||
|
assert path == image_dir / "abc123.png"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("filename", ["../../secret.png", "..\\secret.png", None, 12345])
|
||||||
|
def test_gallery_image_path_rejects_unsafe_stored_filenames(tmp_path, monkeypatch, filename):
|
||||||
|
gallery_routes = _gallery_module()
|
||||||
|
image_dir = tmp_path / "generated_images"
|
||||||
|
image_dir.mkdir()
|
||||||
|
monkeypatch.setattr(gallery_routes, "GALLERY_IMAGE_DIR", image_dir)
|
||||||
|
|
||||||
|
with pytest.raises(HTTPException) as exc:
|
||||||
|
gallery_routes._gallery_image_path(filename)
|
||||||
|
|
||||||
|
assert exc.value.status_code == 400
|
||||||
|
|
||||||
|
|
||||||
|
def test_gallery_image_path_rejects_symlink_escape(tmp_path, monkeypatch):
|
||||||
|
gallery_routes = _gallery_module()
|
||||||
|
image_dir = tmp_path / "generated_images"
|
||||||
|
image_dir.mkdir()
|
||||||
|
outside = tmp_path / "outside.png"
|
||||||
|
outside.write_bytes(b"outside image root")
|
||||||
|
link = image_dir / "escape.png"
|
||||||
|
try:
|
||||||
|
os.symlink(outside, link)
|
||||||
|
except (AttributeError, NotImplementedError, OSError) as exc:
|
||||||
|
pytest.skip(f"symlinks unavailable: {exc}")
|
||||||
|
monkeypatch.setattr(gallery_routes, "GALLERY_IMAGE_DIR", image_dir)
|
||||||
|
|
||||||
|
with pytest.raises(HTTPException) as exc:
|
||||||
|
gallery_routes._gallery_image_path("escape.png")
|
||||||
|
|
||||||
|
assert exc.value.status_code == 400
|
||||||
|
|
||||||
|
|
||||||
|
def test_gallery_file_operations_use_confining_resolver():
|
||||||
|
source = Path("routes/gallery_routes.py").read_text(encoding="utf-8")
|
||||||
|
|
||||||
|
assert 'Path("data/generated_images") / img.filename' not in source
|
||||||
|
assert 'os.path.join("data", "generated_images", img.filename)' not in source
|
||||||
|
assert 'os.path.join("data", "generated_images", img_filename)' not in source
|
||||||
|
assert source.count("_gallery_image_path(img.filename)") >= 3
|
||||||
|
assert "_gallery_image_path(img_filename)" in source
|
||||||
Reference in New Issue
Block a user