diff --git a/routes/history_routes.py b/routes/history_routes.py index e517c8d..756d8a1 100644 --- a/routes/history_routes.py +++ b/routes/history_routes.py @@ -477,10 +477,10 @@ def setup_history_routes(session_manager) -> APIRouter: @router.get("/api/conversations/topics") async def get_conversation_topics(request: Request) -> Dict[str, Any]: - from src.auth_helpers import get_current_user - user = get_current_user(request) + from src.auth_helpers import require_user + user = require_user(request) try: - return analyze_topics(session_manager, owner=user) + return analyze_topics(session_manager, owner=user or None) except Exception as e: raise HTTPException(500, f"Topic analysis failed: {e}") diff --git a/src/topic_analyzer.py b/src/topic_analyzer.py index 0f1dae8..73666ae 100644 --- a/src/topic_analyzer.py +++ b/src/topic_analyzer.py @@ -23,20 +23,31 @@ def analyze_topics(session_manager, owner: str = None) -> Dict[str, Any]: Scan non-archived sessions and return topic frequency data. If owner is set, only include sessions belonging to that user. + When `owner` is None or empty the helper returns an empty result. The + unauthenticated-loopback path in `app.py` produces a None owner, and + silently aggregating topic frequencies in that case is a cross-tenant + data leak. Callers that want a system-wide aggregate must pass an + explicit `owner` string (e.g. a documented "admin" pseudo-owner) or + the route must reject the request with 401. + Returns dict with "topics" list and "total_topics" count. """ + if not owner: + return {"topics": [], "total_topics": 0} + topic_counts: Dict[str, int] = {t: 0 for t in TOPIC_KEYWORDS} topic_matches: Dict[str, list] = {t: [] for t in TOPIC_KEYWORDS} for session_id, session_data in session_manager.sessions.items(): if session_data.get("archived", False): continue - # SECURITY: strict ownership — the previous predicate let any - # null-owner session feed into another user's topic analysis. - if owner: - sess_owner = session_data.get("owner") or getattr(session_data, "owner", None) - if sess_owner != owner: - continue + # Strict ownership: any session whose owner does not match the + # caller is excluded. Ownerless sessions are never included + # unless the caller is itself ownerless (which the early return + # above already prevents). + sess_owner = session_data.get("owner") or getattr(session_data, "owner", None) + if sess_owner != owner: + continue for msg in session_data.get("history", []): content_raw = msg.get("content") if isinstance(msg, dict) else getattr(msg, "content", None) diff --git a/tests/test_history_topics_owner_scope.py b/tests/test_history_topics_owner_scope.py new file mode 100644 index 0000000..a94d882 --- /dev/null +++ b/tests/test_history_topics_owner_scope.py @@ -0,0 +1,280 @@ +""" +Round-4 / Finding A3.1 validator. + +Claim under test: + /api/conversations/topics (routes/history_routes.py:478-485) forwards + `owner=get_current_user(request)` to `analyze_topics`, and + `analyze_topics` in src/topic_analyzer.py:21-85 SKIPS the owner + filter when `owner` is falsy. Combined with the + LOCALHOST_BYPASS / trusted-loopback branch in app.py:248, an + unauthenticated loopback caller can aggregate topic counts and + per-snippet `session_id` / `session_name` / `role` / `snippet` + examples from every user's sessions. + +This test pins the data flow by: + + (1) Calling `analyze_topics` directly with `owner=None` against a + stub SessionManager whose `sessions` dict contains entries for + three different owners. A correctly-scoped helper MUST return + zero topics (or an empty result) when owner is None/empty, + because no caller has identified themselves. + + (2) Driving the actual route through FastAPI's TestClient with an + AuthMiddleware stub that mimics the LOCALHOST_BYPASS path: the + request has no auth cookie, no bearer token, no internal-tool + header, but the middleware short-circuits BEFORE setting + `request.state.current_user`. The expected behavior is one of: + (a) 401 / 403 response, OR + (b) a response that only contains the requesting user's + topics (which for this anonymous caller is none). + +If the test FAILS, the bug is REAL. If the test PASSES, the claim +is a FALSE POSITIVE. +""" +import os +import sys +import types +from types import SimpleNamespace +from unittest.mock import MagicMock + +import pytest + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_session(sid, owner, history): + """Build a dict-shaped session that `analyze_topics` can walk.""" + return { + "id": sid, + "owner": owner, + "name": f"Session {sid[:6]}", + "archived": False, + "history": history, + } + + +def _stub_session_manager(sessions): + """A duck-typed SessionManager exposing the `.sessions` dict the + `analyze_topics` helper iterates over.""" + return SimpleNamespace(sessions=sessions) + + +# --------------------------------------------------------------------------- +# 1. Pure-function test on `analyze_topics` +# --------------------------------------------------------------------------- + + +def test_analyze_topics_with_owner_none_does_not_leak_across_owners(): + """ + The most important invariant: when no caller is identified (owner is + None/empty), `analyze_topics` MUST return no cross-tenant data. The + current implementation (src/topic_analyzer.py:21-39) only enters the + owner filter when `owner` is truthy, so owner=None silently scans + every session regardless of owner. + + This is a stand-alone unit test of the helper. If it returns topics + for sessions whose owners are "alice", "bob", and "carol" while + `owner=None`, the filter is not strict, and the route bug is real. + """ + from src.topic_analyzer import analyze_topics + + sessions = { + "s-alice-1": _make_session( + "s-alice-1", "alice", + [{"role": "user", "content": "Let's discuss AI safety."}], + ), + "s-bob-1": _make_session( + "s-bob-1", "bob", + [{"role": "user", "content": "I need to fix a python bug today."}], + ), + "s-carol-1": _make_session( + "s-carol-1", "carol", + [{"role": "user", "content": "Family dinner planning and health."}], + ), + } + sm = _stub_session_manager(sessions) + + result = analyze_topics(sm, owner=None) + + # When the caller is unidentified, no cross-tenant topics may leak. + assert result["topics"] == [], ( + f"analyze_topics(owner=None) leaked cross-tenant data: " + f"{[t['topic'] for t in result['topics']]}. " + f"Expected empty result so an unauthenticated loopback caller " + f"cannot aggregate other users' topic frequencies." + ) + assert result["total_topics"] == 0, ( + f"analyze_topics(owner=None) reported total_topics=" + f"{result['total_topics']} instead of 0. Cross-tenant leakage." + ) + + +def test_analyze_topics_with_owner_none_no_owner_attribute_session_also_safe(): + """ + Even if some legacy sessions have NO `owner` key at all (pre-ownership + data, or sessions created before multi-tenant), the helper must NOT + surface them to an unauthenticated caller. The current code's + `if owner:` short-circuit means those rows ARE included in the + no-owner scan. This test pins that the leak is observable on the + data path that the route will hit. + """ + from src.topic_analyzer import analyze_topics + + # Legacy-shape session: no `owner` key, ownerless topic-rich history. + legacy = _make_session( + "s-legacy-1", None, + [{"role": "user", "content": "Work meeting about a project deadline."}], + ) + del legacy["owner"] # truly ownerless dict + sm = _stub_session_manager({"s-legacy-1": legacy}) + + result = analyze_topics(sm, owner=None) + + assert result["topics"] == [], ( + f"analyze_topics(owner=None) returned topics for an ownerless " + f"session: {result['topics']}. An anonymous caller should not be " + f"able to harvest topics from any session they don't own." + ) + + +# --------------------------------------------------------------------------- +# 2. End-to-end test through FastAPI TestClient with a stubbed +# AuthMiddleware that simulates the LOCALHOST_BYPASS branch. +# --------------------------------------------------------------------------- + + +def _build_app_with_loopback_bypass(session_manager): + """ + Build a minimal FastAPI app that: + * mounts the real `setup_history_routes(session_manager)` router, + * installs a stub `AuthMiddleware` whose `dispatch` reproduces + the LOCALHOST_BYPASS branch from app.py:248-249 (return from + dispatch *before* setting `request.state.current_user`), + * uses an `AuthManager` whose `is_configured` is True so the + non-loopback / non-bypass path would otherwise 401. + + The result: the middleware trusts the request as loopback-bypass + but leaves `request.state.current_user` unset. The route then + reads `get_current_user(request)` -> None, which `analyze_topics` + treats as 'no filter' and returns cross-tenant topics. + """ + from fastapi import FastAPI + from routes.history_routes import setup_history_routes + + app = FastAPI() + app.include_router(setup_history_routes(session_manager)) + + # Stub AuthManager so app.state.auth_manager.is_configured is True. + auth_mgr = MagicMock() + auth_mgr.is_configured = True + auth_mgr.users = {"alice": {}, "bob": {}, "carol": {}} + app.state.auth_manager = auth_mgr + + # Stub BaseHTTPMiddleware that mirrors the loopback-bypass branch. + from starlette.middleware.base import BaseHTTPMiddleware + from starlette.requests import Request as _Req + + class LoopbackBypassMiddleware(BaseHTTPMiddleware): + async def dispatch(self, request, call_next): + # Faithful reproduction of the LOCALHOST_BYPASS branch: + # `if LOCALHOST_BYPASS and _is_trusted_loopback(request): + # return await call_next(request)` + # No `request.state.current_user = ...` is set. + return await call_next(request) + + # Re-register as "AuthMiddleware" to mirror the prod class name and + # make the contract obvious to the reader. + class AuthMiddleware(LoopbackBypassMiddleware): + pass + + app.add_middleware(AuthMiddleware) + return app + + +def test_route_rejects_or_scopes_under_loopback_bypass(): + """ + Drive the real route via TestClient with a stubbed AuthMiddleware + that mimics LOCALHOST_BYPASS: no `current_user` is set. The + endpoint must NOT return cross-tenant topics in the response. + """ + from fastapi.testclient import TestClient + + sessions = { + "s-alice-1": _make_session( + "s-alice-1", "alice", + [{"role": "user", "content": "AI safety is a fascinating topic."}], + ), + "s-bob-1": _make_session( + "s-bob-1", "bob", + [{"role": "user", "content": "I need to fix a python bug."}], + ), + "s-carol-1": _make_session( + "s-carol-1", "carol", + [{"role": "user", "content": "Family dinner planning tonight."}], + ), + } + sm = _stub_session_manager(sessions) + app = _build_app_with_loopback_bypass(sm) + client = TestClient(app) + + # No auth cookie, no bearer token, no internal-tool header. Pretend + # to come from a real local client. The middleware bypasses auth + # exactly as app.py:248 would. + resp = client.get( + "/api/conversations/topics", + headers={"host": "127.0.0.1:8000"}, + ) + + # Behavior under the fix: the route uses `require_user` which raises + # 401 when auth_manager is configured and the caller is anonymous, + # which is the state this test sets up. The cross-tenant leak path + # (200 with topics from other owners) must be closed. + assert resp.status_code == 401, ( + f"Expected 401 from /api/conversations/topics under the loopback " + f"bypass + configured auth_manager; got {resp.status_code}. " + f"body={resp.text!r}" + ) + + +def test_route_data_flow_on_paper(): + """ + White-box check: prove the data flow on the page. + - `get_current_user(request)` returns `None` when no state is set. + - `analyze_topics(sm, owner=None)` walks sessions of all owners. + - The route forwards `owner=user` (where user may be None) to + `analyze_topics` without further checks. + This test does not exercise the route; it pins the three independent + facts the audit relies on. If any of them regresses (e.g. someone + adds a fallback in get_current_user, or changes `if owner:` to a + strict bool check), this test will start failing in a way that + makes the regression visible. + """ + from src.auth_helpers import get_current_user + from src.topic_analyzer import analyze_topics + + # (a) get_current_user with no state returns None. + req = SimpleNamespace(state=SimpleNamespace()) + assert get_current_user(req) is None, ( + "get_current_user must return None when no middleware has set " + "request.state.current_user." + ) + + # (b) analyze_topics with owner=None MUST NOT walk other owners' + # sessions. The previous behavior was a cross-tenant data leak; the + # fix returns an empty result. If this assertion is inverted in a + # future regression, A3.1 is back. + sm = _stub_session_manager({ + "s1": _make_session("s1", "alice", + [{"role": "user", "content": "AI safety."}]), + "s2": _make_session("s2", "bob", + [{"role": "user", "content": "Python bug."}]), + }) + res = analyze_topics(sm, owner=None) + assert res["topics"] == [], ( + "analyze_topics(owner=None) returned cross-tenant data — " + "Finding A3.1 regression. Expected empty result." + ) + assert res["total_topics"] == 0