fix(memory): owner-scope memory route session access

This commit is contained in:
Afonso Coutinho
2026-06-03 23:13:56 +01:00
committed by GitHub
parent c58cb067f2
commit b6607d219d
2 changed files with 79 additions and 3 deletions

View File

@@ -39,6 +39,18 @@ def setup_memory_routes(memory_manager: MemoryManager, session_manager: SessionM
def _owner(request: Request) -> Optional[str]: def _owner(request: Request) -> Optional[str]:
return get_current_user(request) return get_current_user(request)
def _assert_session_owner(session_obj, user):
"""SECURITY: 404 if the caller does not own this session.
SessionManager.get_session is NOT owner-scoped — it returns any
session by id. These routes accept a caller-supplied session id, so
without this gate a user could target another tenant's session and
leak their chat history, their session-scoped LLM credentials, or the
session title. Mirrors session_routes / webhook_routes ownership.
"""
if user is not None and getattr(session_obj, "owner", None) != user:
raise HTTPException(404, "Session not found")
def _verify_memory_owner(memory: dict, user: Optional[str]): def _verify_memory_owner(memory: dict, user: Optional[str]):
"""Raise 404 if user doesn't own this memory. """Raise 404 if user doesn't own this memory.
@@ -161,12 +173,12 @@ def setup_memory_routes(memory_manager: MemoryManager, session_manager: SessionM
@router.get("/by-session/{session_id}") @router.get("/by-session/{session_id}")
def get_memory_by_session(request: Request, session_id: str): def get_memory_by_session(request: Request, session_id: str):
"""Get all memories associated with a specific session.""" """Get all memories associated with a specific session."""
user = _owner(request)
try: try:
session_manager.get_session(session_id) _session_obj = session_manager.get_session(session_id)
except KeyError: except KeyError:
raise HTTPException(404, f"Session {session_id} not found") raise HTTPException(404, f"Session {session_id} not found")
_assert_session_owner(_session_obj, user)
user = _owner(request)
memories = memory_manager.load(owner=user) memories = memory_manager.load(owner=user)
session_memories = [m for m in memories if m.get("session_id") == session_id] session_memories = [m for m in memories if m.get("session_id") == session_id]
@@ -196,6 +208,7 @@ def setup_memory_routes(memory_manager: MemoryManager, session_manager: SessionM
sess = session_manager.get_session(session) sess = session_manager.get_session(session)
except KeyError: except KeyError:
raise HTTPException(404, "Session not found") raise HTTPException(404, "Session not found")
_assert_session_owner(sess, _owner(request))
system_msg = { system_msg = {
"role": "system", "role": "system",
@@ -277,6 +290,7 @@ def setup_memory_routes(memory_manager: MemoryManager, session_manager: SessionM
if not endpoint_url and session: if not endpoint_url and session:
try: try:
sess = session_manager.get_session(session) sess = session_manager.get_session(session)
_assert_session_owner(sess, _owner(request))
endpoint_url = sess.endpoint_url endpoint_url = sess.endpoint_url
model = sess.model model = sess.model
headers = sess.headers headers = sess.headers
@@ -327,6 +341,7 @@ def setup_memory_routes(memory_manager: MemoryManager, session_manager: SessionM
if session: if session:
try: try:
sess = session_manager.get_session(session) sess = session_manager.get_session(session)
_assert_session_owner(sess, _owner(request))
endpoint_url = sess.endpoint_url endpoint_url = sess.endpoint_url
model = sess.model model = sess.model
headers = sess.headers headers = sess.headers

View File

@@ -0,0 +1,61 @@
"""Memory routes must owner-scope caller-supplied session ids.
SessionManager.get_session returns any session by id (no owner scoping). The
/api/memory extract, audit, import, and by-session handlers accept a
caller-supplied session id, so without an ownership gate a user could target
another tenant's session and leak their chat history, session-scoped LLM
credentials, or session title.
"""
import asyncio
from types import SimpleNamespace
from unittest.mock import MagicMock
import pytest
from fastapi import HTTPException
import routes.memory_routes as mr
def _route(router, path, method):
for r in router.routes:
if r.path == path and method in getattr(r, "methods", set()):
return r.endpoint
raise AssertionError(path)
def _router(monkeypatch, caller):
monkeypatch.setattr(mr, "get_current_user", lambda request: caller, raising=False)
monkeypatch.setattr(mr, "require_user", lambda request: caller, raising=False)
sm = MagicMock()
sm.sessions = {}
sm.get_session = lambda sid: SimpleNamespace(
owner="alice", name="Secret project", endpoint_url="http://x", model="m",
headers={"Authorization": "Bearer victim-secret"},
get_context_messages=lambda: [],
)
mem = MagicMock()
mem.load = lambda owner=None: []
return mr.setup_memory_routes(mem, sm)
def test_extract_rejects_other_users_session(monkeypatch):
router = _router(monkeypatch, caller="bob")
extract = _route(router, "/api/memory/extract", "POST")
with pytest.raises(HTTPException) as exc:
asyncio.run(extract(request=None, session="alice-sess"))
assert exc.value.status_code == 404
def test_by_session_rejects_other_users_session(monkeypatch):
router = _router(monkeypatch, caller="bob")
gbs = _route(router, "/api/memory/by-session/{session_id}", "GET")
with pytest.raises(HTTPException) as exc:
gbs(request=None, session_id="alice-sess")
assert exc.value.status_code == 404
def test_owner_can_access_own_session(monkeypatch):
router = _router(monkeypatch, caller="alice")
gbs = _route(router, "/api/memory/by-session/{session_id}", "GET")
out = gbs(request=None, session_id="alice-sess")
assert out["session_name"] == "Secret project"