fix(research): validate session_id to block path traversal

Every research endpoint interpolates session_id into filesystem paths
(Path('data/deep_research') / f'{session_id}.json') without checking
for traversal sequences. A crafted ID like '../../data/auth' reaches
arbitrary JSON files — readable via research_detail (which also leaks
file paths in error messages), writable via research_archive, and
deletable via research_delete.

Add _validate_session_id() which rejects anything outside
[a-zA-Z0-9-]{1,128}. Called before filesystem access in all 12
endpoints that accept a session_id path parameter.
This commit is contained in:
Ernest Hysa
2026-06-01 23:25:38 +01:00
parent 7b9ef95b60
commit cb6f6b65ea
2 changed files with 74 additions and 0 deletions

View File

@@ -3,6 +3,7 @@
import asyncio import asyncio
import json import json
import logging import logging
import re
import uuid import uuid
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
@@ -14,6 +15,8 @@ from pydantic import BaseModel, Field
from src.endpoint_resolver import resolve_endpoint from src.endpoint_resolver import resolve_endpoint
from src.auth_helpers import get_current_user from src.auth_helpers import get_current_user
_SESSION_ID_RE = re.compile(r"^[a-zA-Z0-9-]{1,128}$")
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Model-name substrings that are NOT chat/generation models — research must # Model-name substrings that are NOT chat/generation models — research must
@@ -58,6 +61,10 @@ def setup_research_routes(research_handler, session_manager=None) -> APIRouter:
raise HTTPException(401, "Not authenticated") raise HTTPException(401, "Not authenticated")
return user return user
def _validate_session_id(session_id: str) -> None:
if not _SESSION_ID_RE.fullmatch(session_id):
raise HTTPException(400, "Invalid session ID format")
def _owns_in_memory(session_id: str, user: str) -> bool: def _owns_in_memory(session_id: str, user: str) -> bool:
"""Ownership check for an in-flight (in-memory) research task. """Ownership check for an in-flight (in-memory) research task.
Falls back to the on-disk JSON if the task has already finished.""" Falls back to the on-disk JSON if the task has already finished."""
@@ -95,6 +102,7 @@ def setup_research_routes(research_handler, session_manager=None) -> APIRouter:
@router.get("/api/research/status/{session_id}") @router.get("/api/research/status/{session_id}")
async def research_status(session_id: str, request: Request): async def research_status(session_id: str, request: Request):
user = _require_user(request) user = _require_user(request)
_validate_session_id(session_id)
if not _owns_in_memory(session_id, user): if not _owns_in_memory(session_id, user):
raise HTTPException(404, "No research found for this session") raise HTTPException(404, "No research found for this session")
status = research_handler.get_status(session_id) status = research_handler.get_status(session_id)
@@ -105,6 +113,7 @@ def setup_research_routes(research_handler, session_manager=None) -> APIRouter:
@router.post("/api/research/cancel/{session_id}") @router.post("/api/research/cancel/{session_id}")
async def research_cancel(session_id: str, request: Request): async def research_cancel(session_id: str, request: Request):
user = _require_user(request) user = _require_user(request)
_validate_session_id(session_id)
if not _owns_in_memory(session_id, user): if not _owns_in_memory(session_id, user):
raise HTTPException(404, "No research found for this session") raise HTTPException(404, "No research found for this session")
cancelled = research_handler.cancel_research(session_id) cancelled = research_handler.cancel_research(session_id)
@@ -113,6 +122,7 @@ def setup_research_routes(research_handler, session_manager=None) -> APIRouter:
@router.post("/api/research/result/{session_id}") @router.post("/api/research/result/{session_id}")
async def research_result(session_id: str, request: Request): async def research_result(session_id: str, request: Request):
user = _require_user(request) user = _require_user(request)
_validate_session_id(session_id)
if not _owns_in_memory(session_id, user): if not _owns_in_memory(session_id, user):
raise HTTPException(404, "No research result available") raise HTTPException(404, "No research result available")
result = research_handler.get_result(session_id) result = research_handler.get_result(session_id)
@@ -140,6 +150,7 @@ def setup_research_routes(research_handler, session_manager=None) -> APIRouter:
async def research_report(session_id: str, request: Request): async def research_report(session_id: str, request: Request):
"""Serve the visual HTML report for a completed research session.""" """Serve the visual HTML report for a completed research session."""
user = _require_user(request) user = _require_user(request)
_validate_session_id(session_id)
_assert_owns_research(session_id, user) _assert_owns_research(session_id, user)
logger.info(f"Visual report requested for session {session_id}") logger.info(f"Visual report requested for session {session_id}")
try: try:
@@ -160,6 +171,7 @@ def setup_research_routes(research_handler, session_manager=None) -> APIRouter:
"""Mark an image URL as hidden for this research's visual report. """Mark an image URL as hidden for this research's visual report.
Persisted to the research JSON so subsequent /report renders skip it.""" Persisted to the research JSON so subsequent /report renders skip it."""
user = _require_user(request) user = _require_user(request)
_validate_session_id(session_id)
_assert_owns_research(session_id, user) _assert_owns_research(session_id, user)
ok = research_handler.hide_image(session_id, body.url) ok = research_handler.hide_image(session_id, body.url)
if not ok: if not ok:
@@ -170,6 +182,7 @@ def setup_research_routes(research_handler, session_manager=None) -> APIRouter:
async def research_unhide_images(session_id: str, request: Request): async def research_unhide_images(session_id: str, request: Request):
"""Clear the hidden-images list for a research session.""" """Clear the hidden-images list for a research session."""
user = _require_user(request) user = _require_user(request)
_validate_session_id(session_id)
_assert_owns_research(session_id, user) _assert_owns_research(session_id, user)
ok = research_handler.unhide_all_images(session_id) ok = research_handler.unhide_all_images(session_id)
if not ok: if not ok:
@@ -235,6 +248,7 @@ def setup_research_routes(research_handler, session_manager=None) -> APIRouter:
"""Return the full JSON for a single research result — sources, """Return the full JSON for a single research result — sources,
summary, stats — used by the Library preview panel.""" summary, stats — used by the Library preview panel."""
user = _require_user(request) user = _require_user(request)
_validate_session_id(session_id)
path = Path("data/deep_research") / f"{session_id}.json" path = Path("data/deep_research") / f"{session_id}.json"
if not path.exists(): if not path.exists():
raise HTTPException(404, "Research not found") raise HTTPException(404, "Research not found")
@@ -251,6 +265,7 @@ def setup_research_routes(research_handler, session_manager=None) -> APIRouter:
async def research_archive(session_id: str, request: Request, archived: bool = Query(True)): async def research_archive(session_id: str, request: Request, archived: bool = Query(True)):
"""Soft-archive / restore a research report (sets `archived` in its JSON).""" """Soft-archive / restore a research report (sets `archived` in its JSON)."""
user = _require_user(request) user = _require_user(request)
_validate_session_id(session_id)
path = Path("data/deep_research") / f"{session_id}.json" path = Path("data/deep_research") / f"{session_id}.json"
if not path.exists(): if not path.exists():
raise HTTPException(404, "Research not found") raise HTTPException(404, "Research not found")
@@ -270,6 +285,7 @@ def setup_research_routes(research_handler, session_manager=None) -> APIRouter:
async def research_delete(session_id: str, request: Request): async def research_delete(session_id: str, request: Request):
"""Delete a research result from disk.""" """Delete a research result from disk."""
user = _require_user(request) user = _require_user(request)
_validate_session_id(session_id)
data_dir = Path("data/deep_research") data_dir = Path("data/deep_research")
json_path = data_dir / f"{session_id}.json" json_path = data_dir / f"{session_id}.json"
deleted = False deleted = False
@@ -413,6 +429,7 @@ def setup_research_routes(research_handler, session_manager=None) -> APIRouter:
async def research_stream(session_id: str, request: Request): async def research_stream(session_id: str, request: Request):
"""SSE stream of research progress events.""" """SSE stream of research progress events."""
user = _require_user(request) user = _require_user(request)
_validate_session_id(session_id)
if not _owns_in_memory(session_id, user): if not _owns_in_memory(session_id, user):
raise HTTPException(404, "No research found for this session") raise HTTPException(404, "No research found for this session")
async def _generate(): async def _generate():
@@ -446,6 +463,7 @@ def setup_research_routes(research_handler, session_manager=None) -> APIRouter:
async def research_result_peek(session_id: str, request: Request): async def research_result_peek(session_id: str, request: Request):
"""Get research result without clearing it (for panel use).""" """Get research result without clearing it (for panel use)."""
user = _require_user(request) user = _require_user(request)
_validate_session_id(session_id)
if not _owns_in_memory(session_id, user): if not _owns_in_memory(session_id, user):
raise HTTPException(404, "No research found for this session") raise HTTPException(404, "No research found for this session")
result = research_handler.get_result(session_id) result = research_handler.get_result(session_id)
@@ -475,6 +493,7 @@ def setup_research_routes(research_handler, session_manager=None) -> APIRouter:
the user can ask follow-up questions in a clean conversation. the user can ask follow-up questions in a clean conversation.
""" """
_require_user(request) _require_user(request)
_validate_session_id(session_id)
if session_manager is None: if session_manager is None:
raise HTTPException(500, "session_manager not configured") raise HTTPException(500, "session_manager not configured")

View File

@@ -0,0 +1,55 @@
"""Regression tests: research session_id must reject path-traversal sequences."""
import re
import unittest
_SESSION_ID_RE = re.compile(r"^[a-zA-Z0-9-]{1,128}$")
class TestResearchSessionIdValidation(unittest.TestCase):
"""Validate the regex used to guard research session_id path params."""
def test_accepts_rp_prefixed_id(self):
self.assertIsNotNone(_SESSION_ID_RE.fullmatch("rp-abc123def456"))
def test_accepts_standard_uuid(self):
self.assertIsNotNone(
_SESSION_ID_RE.fullmatch("550e8400-e29b-41d4-a716-446655440000")
)
def test_accepts_custom_alphanumeric(self):
self.assertIsNotNone(_SESSION_ID_RE.fullmatch("custom-id-123"))
def test_rejects_double_dot(self):
self.assertIsNone(_SESSION_ID_RE.fullmatch(".."))
def test_rejects_single_dot(self):
self.assertIsNone(_SESSION_ID_RE.fullmatch("."))
def test_rejects_dot_slash_traversal(self):
self.assertIsNone(_SESSION_ID_RE.fullmatch("../../data/auth"))
def test_rejects_deep_traversal(self):
self.assertIsNone(_SESSION_ID_RE.fullmatch("../../../etc/passwd"))
def test_rejects_mixed_traversal(self):
self.assertIsNone(_SESSION_ID_RE.fullmatch("normal/../../traversal"))
def test_rejects_dot_prefix_traversal(self):
self.assertIsNone(_SESSION_ID_RE.fullmatch("./../../secret"))
def test_rejects_empty(self):
self.assertIsNone(_SESSION_ID_RE.fullmatch(""))
def test_rejects_whitespace(self):
self.assertIsNone(_SESSION_ID_RE.fullmatch(" "))
def test_rejects_slash(self):
self.assertIsNone(_SESSION_ID_RE.fullmatch("a/b"))
def test_rejects_null_byte(self):
self.assertIsNone(_SESSION_ID_RE.fullmatch("rp-test\x00"))
if __name__ == "__main__":
unittest.main()