diff --git a/routes/memory_routes.py b/routes/memory_routes.py index 336f37e..49b49b2 100644 --- a/routes/memory_routes.py +++ b/routes/memory_routes.py @@ -39,6 +39,18 @@ def setup_memory_routes(memory_manager: MemoryManager, session_manager: SessionM def _owner(request: Request) -> Optional[str]: return get_current_user(request) + def _assert_session_owner(session_obj, user): + """SECURITY: 404 if the caller does not own this session. + + SessionManager.get_session is NOT owner-scoped — it returns any + session by id. These routes accept a caller-supplied session id, so + without this gate a user could target another tenant's session and + leak their chat history, their session-scoped LLM credentials, or the + session title. Mirrors session_routes / webhook_routes ownership. + """ + if user is not None and getattr(session_obj, "owner", None) != user: + raise HTTPException(404, "Session not found") + def _verify_memory_owner(memory: dict, user: Optional[str]): """Raise 404 if user doesn't own this memory. @@ -161,12 +173,12 @@ def setup_memory_routes(memory_manager: MemoryManager, session_manager: SessionM @router.get("/by-session/{session_id}") def get_memory_by_session(request: Request, session_id: str): """Get all memories associated with a specific session.""" + user = _owner(request) try: - session_manager.get_session(session_id) + _session_obj = session_manager.get_session(session_id) except KeyError: raise HTTPException(404, f"Session {session_id} not found") - - user = _owner(request) + _assert_session_owner(_session_obj, user) memories = memory_manager.load(owner=user) session_memories = [m for m in memories if m.get("session_id") == session_id] @@ -196,6 +208,7 @@ def setup_memory_routes(memory_manager: MemoryManager, session_manager: SessionM sess = session_manager.get_session(session) except KeyError: raise HTTPException(404, "Session not found") + _assert_session_owner(sess, _owner(request)) system_msg = { "role": "system", @@ -277,6 +290,7 @@ def setup_memory_routes(memory_manager: MemoryManager, session_manager: SessionM if not endpoint_url and session: try: sess = session_manager.get_session(session) + _assert_session_owner(sess, _owner(request)) endpoint_url = sess.endpoint_url model = sess.model headers = sess.headers @@ -327,6 +341,7 @@ def setup_memory_routes(memory_manager: MemoryManager, session_manager: SessionM if session: try: sess = session_manager.get_session(session) + _assert_session_owner(sess, _owner(request)) endpoint_url = sess.endpoint_url model = sess.model headers = sess.headers diff --git a/tests/test_memory_routes_session_owner.py b/tests/test_memory_routes_session_owner.py new file mode 100644 index 0000000..8e57332 --- /dev/null +++ b/tests/test_memory_routes_session_owner.py @@ -0,0 +1,61 @@ +"""Memory routes must owner-scope caller-supplied session ids. + +SessionManager.get_session returns any session by id (no owner scoping). The +/api/memory extract, audit, import, and by-session handlers accept a +caller-supplied session id, so without an ownership gate a user could target +another tenant's session and leak their chat history, session-scoped LLM +credentials, or session title. +""" +import asyncio +from types import SimpleNamespace +from unittest.mock import MagicMock + +import pytest +from fastapi import HTTPException + +import routes.memory_routes as mr + + +def _route(router, path, method): + for r in router.routes: + if r.path == path and method in getattr(r, "methods", set()): + return r.endpoint + raise AssertionError(path) + + +def _router(monkeypatch, caller): + monkeypatch.setattr(mr, "get_current_user", lambda request: caller, raising=False) + monkeypatch.setattr(mr, "require_user", lambda request: caller, raising=False) + sm = MagicMock() + sm.sessions = {} + sm.get_session = lambda sid: SimpleNamespace( + owner="alice", name="Secret project", endpoint_url="http://x", model="m", + headers={"Authorization": "Bearer victim-secret"}, + get_context_messages=lambda: [], + ) + mem = MagicMock() + mem.load = lambda owner=None: [] + return mr.setup_memory_routes(mem, sm) + + +def test_extract_rejects_other_users_session(monkeypatch): + router = _router(monkeypatch, caller="bob") + extract = _route(router, "/api/memory/extract", "POST") + with pytest.raises(HTTPException) as exc: + asyncio.run(extract(request=None, session="alice-sess")) + assert exc.value.status_code == 404 + + +def test_by_session_rejects_other_users_session(monkeypatch): + router = _router(monkeypatch, caller="bob") + gbs = _route(router, "/api/memory/by-session/{session_id}", "GET") + with pytest.raises(HTTPException) as exc: + gbs(request=None, session_id="alice-sess") + assert exc.value.status_code == 404 + + +def test_owner_can_access_own_session(monkeypatch): + router = _router(monkeypatch, caller="alice") + gbs = _route(router, "/api/memory/by-session/{session_id}", "GET") + out = gbs(request=None, session_id="alice-sess") + assert out["session_name"] == "Secret project"