diff --git a/routes/document_helpers.py b/routes/document_helpers.py index ace4cad..ebfb177 100644 --- a/routes/document_helpers.py +++ b/routes/document_helpers.py @@ -5,16 +5,16 @@ import logging import os import re -from typing import Dict, Any, Optional +from typing import Any, Dict, Optional -from fastapi import HTTPException +from fastapi import HTTPException, Request from pydantic import BaseModel from core.database import Document, DocumentVersion from core.database import Session as DbSession +from src.upload_handler import UploadHandler logger = logging.getLogger(__name__) -_UPLOAD_ID_RE = re.compile(r"^[0-9a-fA-F]{32}\.[A-Za-z0-9]+$") # ---- Request schemas ---- @@ -138,78 +138,66 @@ def _upload_path_inside(upload_dir: str, path: str) -> bool: return False -def _upload_owner_allowed( - meta: Optional[dict], - user: Optional[str], +def _resolve_user_upload_path( + upload_handler: Any, + upload_id: str, + owner: Optional[str], auth_manager=None, - allow_admin: bool = True, -) -> bool: - if not user: - return ( - not bool(auth_manager and getattr(auth_manager, "is_configured", False)) - and not (meta and meta.get("owner") is not None) +) -> Optional[str]: + """Resolve an upload id to a filesystem path the caller may read.""" + if upload_handler is None: + return None + resolved = upload_handler.resolve_upload( + upload_id, + owner=owner, + auth_manager=auth_manager, + ) + if not resolved: + return None + path = resolved.get("path") + upload_dir = getattr(upload_handler, "upload_dir", None) + if path and upload_dir and not _upload_path_inside(upload_dir, path): + logger.warning("Upload path outside upload directory: %s", path) + return None + return path + + +def _locate_upload( + upload_dir: str, + file_id: str, + owner: Optional[str] = None, + auth_manager=None, + upload_handler: Any = None, +): + """Find an upload by its filename ID via UploadHandler.resolve_upload.""" + if upload_handler is None: + from src.upload_handler import UploadHandler + + base_dir = os.path.dirname(os.path.abspath(upload_dir)) + upload_handler = UploadHandler(base_dir, upload_dir) + return _resolve_user_upload_path(upload_handler, file_id, owner, auth_manager) + + +def _assert_pdf_marker_upload_owned( + request: Request, + content: str, + user: Optional[str], + upload_handler: Any, +) -> None: + """Reject document content whose pdf_source marker points at another user's upload.""" + if upload_handler is None: + return + from src.pdf_form_doc import find_source_upload_id + + upload_id = find_source_upload_id(content or "") + if not upload_id: + return + auth_manager = getattr(getattr(request.app, "state", None), "auth_manager", None) + if not _resolve_user_upload_path(upload_handler, upload_id, user, auth_manager): + raise HTTPException( + 400, + "Document PDF marker references an upload you do not own", ) - if allow_admin and auth_manager and hasattr(auth_manager, "is_admin"): - try: - if auth_manager.is_admin(user): - return True - except Exception: - pass - return bool(meta and meta.get("owner") == user) - - -def _locate_upload(upload_dir: str, file_id: str, owner: Optional[str] = None, auth_manager=None): - """Find an upload by its filename ID. - - Lookup order: - 1. The `uploads.json` index that `UploadHandler.save_upload` maintains, - so owner can be verified before a document reads the source file. - 2. Direct hit at `upload_dir/file_id` (very small deployments). - 3. Fallback: `os.walk` the date-bucketed tree. Slow on large stores; - only allowed after the index owner check passes, or in single-user / - admin-style contexts where no owner is enforced. - - `followlinks=False` keeps a stray symlink loop in `data/uploads/` from - spinning the walker into infinite recursion. - """ - import json as _json - - if not _UPLOAD_ID_RE.fullmatch(file_id or ""): - logger.warning("Rejected invalid upload id in document lookup: %r", file_id) - return None - - meta = None - try: - idx_path = os.path.join(upload_dir, "uploads.json") - if os.path.exists(idx_path): - with open(idx_path, "r", encoding="utf-8") as f: - idx = _json.load(f) - for item in (idx.values() if isinstance(idx, dict) else []): - if isinstance(item, dict) and item.get("id") == file_id: - meta = item - break - except Exception: - meta = None - - if not _upload_owner_allowed(meta, owner, auth_manager): - logger.warning("Upload %s denied for document owner %s", file_id, owner) - return None - - if meta: - p = meta.get("path") - if p and os.path.exists(p) and _upload_path_inside(upload_dir, p): - return p - - direct = os.path.join(upload_dir, file_id) - if os.path.exists(direct) and _upload_path_inside(upload_dir, direct): - return direct - - for root, _dirs, files in os.walk(upload_dir, followlinks=False): - if file_id in files: - p = os.path.join(root, file_id) - if _upload_path_inside(upload_dir, p): - return p - return None def _derive_title(content: str) -> str: diff --git a/routes/document_routes.py b/routes/document_routes.py index 34ef30d..7d65ed3 100644 --- a/routes/document_routes.py +++ b/routes/document_routes.py @@ -20,28 +20,28 @@ from routes.document_helpers import ( DocumentCreate, DocumentUpdate, DocumentPatch, _doc_to_dict, _version_to_dict, _verify_doc_owner, _owner_session_filter, - _slug, _locate_upload, _derive_title, + _slug, _resolve_user_upload_path, _assert_pdf_marker_upload_owned, _derive_title, _PDF_RENDER_SCALE, ) -def _locate_current_user_upload(request: Request, upload_dir: str, upload_id: str, user: Optional[str]): - auth_manager = getattr(getattr(request.app, "state", None), "auth_manager", None) - return _locate_upload(upload_dir, upload_id, owner=user, auth_manager=auth_manager) - - -def _load_pdf_viewer_fitz(): - from src.pdf_runtime import load_pymupdf_for_pdf_viewer - - try: - return load_pymupdf_for_pdf_viewer() - except RuntimeError as exc: - raise HTTPException(503, str(exc)) from exc - - def setup_document_routes(session_manager, upload_handler=None) -> APIRouter: router = APIRouter(tags=["documents"]) + def _locate_current_user_upload(request: Request, upload_id: str, user: Optional[str]): + if upload_handler is None: + return None + auth_manager = getattr(getattr(request.app, "state", None), "auth_manager", None) + return _resolve_user_upload_path(upload_handler, upload_id, user, auth_manager) + + def _load_pdf_viewer_fitz(): + from src.pdf_runtime import load_pymupdf_for_pdf_viewer + + try: + return load_pymupdf_for_pdf_viewer() + except RuntimeError as exc: + raise HTTPException(503, str(exc)) from exc + # ---- POST /api/document ---- @router.post("/api/document") async def create_document(request: Request, req: DocumentCreate) -> Dict[str, Any]: @@ -82,6 +82,8 @@ def setup_document_routes(session_manager, upload_handler=None) -> APIRouter: if _looks_like_email_document(req.content, req.title): language = "email" + _assert_pdf_marker_upload_owned(request, req.content, user, upload_handler) + doc = Document( id=doc_id, session_id=req.session_id, @@ -176,7 +178,7 @@ def setup_document_routes(session_manager, upload_handler=None) -> APIRouter: raise HTTPException(500, f"Upload failed: {e}") upload_id = meta["id"] - pdf_path = _locate_current_user_upload(request, UPLOAD_DIR, upload_id, user) + pdf_path = _locate_current_user_upload(request, upload_id, user) if not pdf_path: raise HTTPException(500, "Saved PDF could not be located") @@ -400,8 +402,8 @@ def setup_document_routes(session_manager, upload_handler=None) -> APIRouter: text extraction was wired, plus for scanned/image-only PDFs where the VL model picks up text the basic pypdf path missed.""" import re - from src.constants import UPLOAD_DIR from src.document_processor import _process_pdf + from src.pdf_form_doc import find_source_upload_id user = get_current_user(request) db = SessionLocal() @@ -412,12 +414,11 @@ def setup_document_routes(session_manager, upload_handler=None) -> APIRouter: _verify_doc_owner(db, doc, user) content = doc.current_content or "" - m = re.search(r'\n\n# x\n' + assert find_source_upload_id(content) is None + + +def test_pdf_marker_write_rejects_cross_owner_upload(tmp_path, monkeypatch): + """Saving a doc whose front-matter points at another user's upload must 400.""" + from src.upload_handler import UploadHandler + + sys.modules.pop("routes.document_helpers", None) + _stub_core_database_for_route_imports(monkeypatch) + from fastapi import HTTPException + from routes.document_helpers import _assert_pdf_marker_upload_owned + + upload_dir, _alice_id, bob_id = _make_upload_store(tmp_path) + handler = UploadHandler(str(tmp_path), str(upload_dir)) + + class _AuthMgr: + is_configured = True + + @staticmethod + def is_admin(_user): + return False + + class _AppState: + auth_manager = _AuthMgr() + + class _App: + state = _AppState() + + class _Req: + app = _App() + + marker = f'\n\n# Notes\n' + with pytest.raises(HTTPException) as exc: + _assert_pdf_marker_upload_owned(_Req(), marker, "alice", handler) + assert exc.value.status_code == 400 + + # Own upload is allowed + own_marker = f'\n\n# Notes\n' + _assert_pdf_marker_upload_owned(_Req(), own_marker, "alice", handler) + + sys.modules.pop("routes.document_helpers", None) + + +def test_pdf_marker_render_lookup_denies_cross_owner_without_doc_leak(tmp_path): + """Read path: cross-owner marker resolves to None (404 at route layer).""" + from src.upload_handler import UploadHandler + + upload_dir, alice_id, bob_id = _make_upload_store(tmp_path) + handler = UploadHandler(str(tmp_path), str(upload_dir)) + + class _AuthMgr: + is_configured = True + + @staticmethod + def is_admin(_user): + return False + + assert handler.resolve_upload(bob_id, owner="alice", auth_manager=_AuthMgr()) is None + resolved = handler.resolve_upload(alice_id, owner="alice", auth_manager=_AuthMgr()) + assert resolved is not None + assert resolved["path"].endswith(alice_id) + + # ── require_user dependency rejects anon callers ──────────────── def test_require_user_rejects_unauthenticated(monkeypatch):