Two changes close the cross-tenant topic leak in /api/conversations/topics. The route at routes/history_routes.py:478 used get_current_user, which returns None when no auth middleware has set request.state.current_user (loopback-bypass, AUTH_ENABLED=false, or any path that short-circuits the middleware). It then forwarded owner=None to analyze_topics. The helper at src/topic_analyzer.py:21 used an 'if owner:' short-circuit in its owner filter, so the None owner took the no-filter path and the helper silently aggregated topic frequencies and per-snippet session_id, session_name, role, and snippet text across every user's sessions. analyze_topics now returns an empty result when owner is falsy. The inner short-circuit is removed because the filter is now strict by construction. The route is switched to require_user, which raises 401 when auth_manager.is_configured is True and the caller is anonymous, matching the pattern used by calendar_routes, skills_routes, and other authenticated routes. The test test_history_topics_owner_scope.py was rewritten to drive the real route through FastAPI's TestClient with a stub AuthMiddleware that mirrors the loopback-bypass branch, and now asserts a strict 401 from the route and an empty result from the helper. The previous version of the test accepted either a 200-with-empty-topics or a 401; the strict assertion means a future regression that drops the require_user wrapper or re-adds the inner short-circuit is caught immediately.
281 lines
11 KiB
Python
281 lines
11 KiB
Python
"""
|
|
Round-4 / Finding A3.1 validator.
|
|
|
|
Claim under test:
|
|
/api/conversations/topics (routes/history_routes.py:478-485) forwards
|
|
`owner=get_current_user(request)` to `analyze_topics`, and
|
|
`analyze_topics` in src/topic_analyzer.py:21-85 SKIPS the owner
|
|
filter when `owner` is falsy. Combined with the
|
|
LOCALHOST_BYPASS / trusted-loopback branch in app.py:248, an
|
|
unauthenticated loopback caller can aggregate topic counts and
|
|
per-snippet `session_id` / `session_name` / `role` / `snippet`
|
|
examples from every user's sessions.
|
|
|
|
This test pins the data flow by:
|
|
|
|
(1) Calling `analyze_topics` directly with `owner=None` against a
|
|
stub SessionManager whose `sessions` dict contains entries for
|
|
three different owners. A correctly-scoped helper MUST return
|
|
zero topics (or an empty result) when owner is None/empty,
|
|
because no caller has identified themselves.
|
|
|
|
(2) Driving the actual route through FastAPI's TestClient with an
|
|
AuthMiddleware stub that mimics the LOCALHOST_BYPASS path: the
|
|
request has no auth cookie, no bearer token, no internal-tool
|
|
header, but the middleware short-circuits BEFORE setting
|
|
`request.state.current_user`. The expected behavior is one of:
|
|
(a) 401 / 403 response, OR
|
|
(b) a response that only contains the requesting user's
|
|
topics (which for this anonymous caller is none).
|
|
|
|
If the test FAILS, the bug is REAL. If the test PASSES, the claim
|
|
is a FALSE POSITIVE.
|
|
"""
|
|
import os
|
|
import sys
|
|
import types
|
|
from types import SimpleNamespace
|
|
from unittest.mock import MagicMock
|
|
|
|
import pytest
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_session(sid, owner, history):
|
|
"""Build a dict-shaped session that `analyze_topics` can walk."""
|
|
return {
|
|
"id": sid,
|
|
"owner": owner,
|
|
"name": f"Session {sid[:6]}",
|
|
"archived": False,
|
|
"history": history,
|
|
}
|
|
|
|
|
|
def _stub_session_manager(sessions):
|
|
"""A duck-typed SessionManager exposing the `.sessions` dict the
|
|
`analyze_topics` helper iterates over."""
|
|
return SimpleNamespace(sessions=sessions)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 1. Pure-function test on `analyze_topics`
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def test_analyze_topics_with_owner_none_does_not_leak_across_owners():
|
|
"""
|
|
The most important invariant: when no caller is identified (owner is
|
|
None/empty), `analyze_topics` MUST return no cross-tenant data. The
|
|
current implementation (src/topic_analyzer.py:21-39) only enters the
|
|
owner filter when `owner` is truthy, so owner=None silently scans
|
|
every session regardless of owner.
|
|
|
|
This is a stand-alone unit test of the helper. If it returns topics
|
|
for sessions whose owners are "alice", "bob", and "carol" while
|
|
`owner=None`, the filter is not strict, and the route bug is real.
|
|
"""
|
|
from src.topic_analyzer import analyze_topics
|
|
|
|
sessions = {
|
|
"s-alice-1": _make_session(
|
|
"s-alice-1", "alice",
|
|
[{"role": "user", "content": "Let's discuss AI safety."}],
|
|
),
|
|
"s-bob-1": _make_session(
|
|
"s-bob-1", "bob",
|
|
[{"role": "user", "content": "I need to fix a python bug today."}],
|
|
),
|
|
"s-carol-1": _make_session(
|
|
"s-carol-1", "carol",
|
|
[{"role": "user", "content": "Family dinner planning and health."}],
|
|
),
|
|
}
|
|
sm = _stub_session_manager(sessions)
|
|
|
|
result = analyze_topics(sm, owner=None)
|
|
|
|
# When the caller is unidentified, no cross-tenant topics may leak.
|
|
assert result["topics"] == [], (
|
|
f"analyze_topics(owner=None) leaked cross-tenant data: "
|
|
f"{[t['topic'] for t in result['topics']]}. "
|
|
f"Expected empty result so an unauthenticated loopback caller "
|
|
f"cannot aggregate other users' topic frequencies."
|
|
)
|
|
assert result["total_topics"] == 0, (
|
|
f"analyze_topics(owner=None) reported total_topics="
|
|
f"{result['total_topics']} instead of 0. Cross-tenant leakage."
|
|
)
|
|
|
|
|
|
def test_analyze_topics_with_owner_none_no_owner_attribute_session_also_safe():
|
|
"""
|
|
Even if some legacy sessions have NO `owner` key at all (pre-ownership
|
|
data, or sessions created before multi-tenant), the helper must NOT
|
|
surface them to an unauthenticated caller. The current code's
|
|
`if owner:` short-circuit means those rows ARE included in the
|
|
no-owner scan. This test pins that the leak is observable on the
|
|
data path that the route will hit.
|
|
"""
|
|
from src.topic_analyzer import analyze_topics
|
|
|
|
# Legacy-shape session: no `owner` key, ownerless topic-rich history.
|
|
legacy = _make_session(
|
|
"s-legacy-1", None,
|
|
[{"role": "user", "content": "Work meeting about a project deadline."}],
|
|
)
|
|
del legacy["owner"] # truly ownerless dict
|
|
sm = _stub_session_manager({"s-legacy-1": legacy})
|
|
|
|
result = analyze_topics(sm, owner=None)
|
|
|
|
assert result["topics"] == [], (
|
|
f"analyze_topics(owner=None) returned topics for an ownerless "
|
|
f"session: {result['topics']}. An anonymous caller should not be "
|
|
f"able to harvest topics from any session they don't own."
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 2. End-to-end test through FastAPI TestClient with a stubbed
|
|
# AuthMiddleware that simulates the LOCALHOST_BYPASS branch.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _build_app_with_loopback_bypass(session_manager):
|
|
"""
|
|
Build a minimal FastAPI app that:
|
|
* mounts the real `setup_history_routes(session_manager)` router,
|
|
* installs a stub `AuthMiddleware` whose `dispatch` reproduces
|
|
the LOCALHOST_BYPASS branch from app.py:248-249 (return from
|
|
dispatch *before* setting `request.state.current_user`),
|
|
* uses an `AuthManager` whose `is_configured` is True so the
|
|
non-loopback / non-bypass path would otherwise 401.
|
|
|
|
The result: the middleware trusts the request as loopback-bypass
|
|
but leaves `request.state.current_user` unset. The route then
|
|
reads `get_current_user(request)` -> None, which `analyze_topics`
|
|
treats as 'no filter' and returns cross-tenant topics.
|
|
"""
|
|
from fastapi import FastAPI
|
|
from routes.history_routes import setup_history_routes
|
|
|
|
app = FastAPI()
|
|
app.include_router(setup_history_routes(session_manager))
|
|
|
|
# Stub AuthManager so app.state.auth_manager.is_configured is True.
|
|
auth_mgr = MagicMock()
|
|
auth_mgr.is_configured = True
|
|
auth_mgr.users = {"alice": {}, "bob": {}, "carol": {}}
|
|
app.state.auth_manager = auth_mgr
|
|
|
|
# Stub BaseHTTPMiddleware that mirrors the loopback-bypass branch.
|
|
from starlette.middleware.base import BaseHTTPMiddleware
|
|
from starlette.requests import Request as _Req
|
|
|
|
class LoopbackBypassMiddleware(BaseHTTPMiddleware):
|
|
async def dispatch(self, request, call_next):
|
|
# Faithful reproduction of the LOCALHOST_BYPASS branch:
|
|
# `if LOCALHOST_BYPASS and _is_trusted_loopback(request):
|
|
# return await call_next(request)`
|
|
# No `request.state.current_user = ...` is set.
|
|
return await call_next(request)
|
|
|
|
# Re-register as "AuthMiddleware" to mirror the prod class name and
|
|
# make the contract obvious to the reader.
|
|
class AuthMiddleware(LoopbackBypassMiddleware):
|
|
pass
|
|
|
|
app.add_middleware(AuthMiddleware)
|
|
return app
|
|
|
|
|
|
def test_route_rejects_or_scopes_under_loopback_bypass():
|
|
"""
|
|
Drive the real route via TestClient with a stubbed AuthMiddleware
|
|
that mimics LOCALHOST_BYPASS: no `current_user` is set. The
|
|
endpoint must NOT return cross-tenant topics in the response.
|
|
"""
|
|
from fastapi.testclient import TestClient
|
|
|
|
sessions = {
|
|
"s-alice-1": _make_session(
|
|
"s-alice-1", "alice",
|
|
[{"role": "user", "content": "AI safety is a fascinating topic."}],
|
|
),
|
|
"s-bob-1": _make_session(
|
|
"s-bob-1", "bob",
|
|
[{"role": "user", "content": "I need to fix a python bug."}],
|
|
),
|
|
"s-carol-1": _make_session(
|
|
"s-carol-1", "carol",
|
|
[{"role": "user", "content": "Family dinner planning tonight."}],
|
|
),
|
|
}
|
|
sm = _stub_session_manager(sessions)
|
|
app = _build_app_with_loopback_bypass(sm)
|
|
client = TestClient(app)
|
|
|
|
# No auth cookie, no bearer token, no internal-tool header. Pretend
|
|
# to come from a real local client. The middleware bypasses auth
|
|
# exactly as app.py:248 would.
|
|
resp = client.get(
|
|
"/api/conversations/topics",
|
|
headers={"host": "127.0.0.1:8000"},
|
|
)
|
|
|
|
# Behavior under the fix: the route uses `require_user` which raises
|
|
# 401 when auth_manager is configured and the caller is anonymous,
|
|
# which is the state this test sets up. The cross-tenant leak path
|
|
# (200 with topics from other owners) must be closed.
|
|
assert resp.status_code == 401, (
|
|
f"Expected 401 from /api/conversations/topics under the loopback "
|
|
f"bypass + configured auth_manager; got {resp.status_code}. "
|
|
f"body={resp.text!r}"
|
|
)
|
|
|
|
|
|
def test_route_data_flow_on_paper():
|
|
"""
|
|
White-box check: prove the data flow on the page.
|
|
- `get_current_user(request)` returns `None` when no state is set.
|
|
- `analyze_topics(sm, owner=None)` walks sessions of all owners.
|
|
- The route forwards `owner=user` (where user may be None) to
|
|
`analyze_topics` without further checks.
|
|
This test does not exercise the route; it pins the three independent
|
|
facts the audit relies on. If any of them regresses (e.g. someone
|
|
adds a fallback in get_current_user, or changes `if owner:` to a
|
|
strict bool check), this test will start failing in a way that
|
|
makes the regression visible.
|
|
"""
|
|
from src.auth_helpers import get_current_user
|
|
from src.topic_analyzer import analyze_topics
|
|
|
|
# (a) get_current_user with no state returns None.
|
|
req = SimpleNamespace(state=SimpleNamespace())
|
|
assert get_current_user(req) is None, (
|
|
"get_current_user must return None when no middleware has set "
|
|
"request.state.current_user."
|
|
)
|
|
|
|
# (b) analyze_topics with owner=None MUST NOT walk other owners'
|
|
# sessions. The previous behavior was a cross-tenant data leak; the
|
|
# fix returns an empty result. If this assertion is inverted in a
|
|
# future regression, A3.1 is back.
|
|
sm = _stub_session_manager({
|
|
"s1": _make_session("s1", "alice",
|
|
[{"role": "user", "content": "AI safety."}]),
|
|
"s2": _make_session("s2", "bob",
|
|
[{"role": "user", "content": "Python bug."}]),
|
|
})
|
|
res = analyze_topics(sm, owner=None)
|
|
assert res["topics"] == [], (
|
|
"analyze_topics(owner=None) returned cross-tenant data — "
|
|
"Finding A3.1 regression. Expected empty result."
|
|
)
|
|
assert res["total_topics"] == 0
|