"""Pin the auth-gate fixes from the 2026-05-19 v2 review so they don't regress. Specifically: - All `/api/research/*` endpoints reject anonymous callers. - Task `create_task` blocks shell-executing action types for non-admins (`run_local`, `run_script`, `ssh_command`). - `pop_notifications(owner)` returns only the calling user's notifications; ownerless legacy notifications are drained only by anonymous/no-owner callers. """ import os import sys import types import asyncio import pytest from types import SimpleNamespace from unittest.mock import MagicMock # Stub `core.database` / `core.auth` before the route modules import them. # (Same trick as test_null_owner_gates.py — the real modules instantiate # SQLAlchemy declarative classes at import-time which blow up under the # conftest's `sqlalchemy.*` MagicMock stubs.) def _ensure_stub(name: str, **attrs): """Create or augment a stub module with the given attributes. Augments existing entries because earlier-run tests may have already stubbed the same module with a different attribute set. Also stubs the parent package and wires the child onto it as an attribute. Without stubbing the parent we'd either (a) run the real `core/__init__.py`, which transitively imports SQLAlchemy-using modules and explodes under the conftest mocks, or (b) leave the stub orphaned so `import core.auth; core.auth.AuthManager` raises `AttributeError`.""" # Stub the parent package first if not already loaded. We point # `__path__` at the real on-disk directory so submodules NOT # stubbed here can still resolve via normal import machinery — # but `core/__init__.py` is bypassed because the package is # already in `sys.modules`, which is exactly what we want. if "." in name: parent_name, _, child_name = name.rpartition(".") if parent_name not in sys.modules: parent = types.ModuleType(parent_name) # Find the real on-disk path so unstubbed submodules # (core.middleware etc.) still load from disk. real_path = os.path.join( os.path.dirname(os.path.dirname(os.path.abspath(__file__))), *parent_name.split("."), ) parent.__path__ = [real_path] if os.path.isdir(real_path) else [] sys.modules[parent_name] = parent else: parent = sys.modules[parent_name] else: parent = None child_name = None mod = sys.modules.get(name) if mod is None: mod = types.ModuleType(name) sys.modules[name] = mod for k, v in attrs.items(): if not hasattr(mod, k): setattr(mod, k, v) if parent is not None and not hasattr(parent, child_name): setattr(parent, child_name, mod) return mod @pytest.fixture(autouse=True) def _auth_regressions_stubs(monkeypatch): db = _ensure_stub("core.database", SessionLocal=MagicMock(), ScheduledTask=MagicMock(), TaskRun=MagicMock(), ModelEndpoint=MagicMock(), Session=MagicMock(), ChatMessage=MagicMock(), CalendarCal=MagicMock(), CalendarEvent=MagicMock(), Document=MagicMock(), DocumentVersion=MagicMock(), GalleryImage=MagicMock(), GalleryAlbum=MagicMock(), Note=MagicMock(), McpServer=MagicMock(), ) auth = _ensure_stub("core.auth", AuthManager=MagicMock()) ep = _ensure_stub("src.endpoint_resolver", resolve_endpoint=MagicMock(return_value=("", "", {})), normalize_base=MagicMock(), build_chat_url=MagicMock(), build_headers=MagicMock(), ) monkeypatch.setitem(sys.modules, "core.database", db) monkeypatch.setitem(sys.modules, "core.auth", auth) monkeypatch.setitem(sys.modules, "src.endpoint_resolver", ep) from fastapi import HTTPException # --------------------------------------------------------------------------- # Auth routes -- open signup setter # --------------------------------------------------------------------------- def _auth_route_endpoint(path: str, method: str): from routes.auth_routes import setup_auth_routes auth_manager = MagicMock() router = setup_auth_routes(auth_manager) for route in router.routes: if getattr(route, "path", "") == path and method in getattr(route, "methods", set()): return auth_manager, route.endpoint raise AssertionError(f"{method} {path} route not registered") def _fake_auth_request(token="session-token"): from routes.auth_routes import SESSION_COOKIE req = SimpleNamespace() req.cookies = {SESSION_COOKIE: token} req.client = SimpleNamespace(host="127.0.0.1") return req def test_set_signup_enabled_true_is_idempotent(): from routes.auth_routes import SetOpenRegistrationRequest auth, target = _auth_route_endpoint("/api/auth/open-signup", "PUT") auth.get_username_for_token.return_value = "admin" auth.is_admin.return_value = True request = _fake_auth_request() auth.signup_enabled = False out = asyncio.run(target(body=SetOpenRegistrationRequest(enabled=True),request=request)) assert out == {"ok": True, "signup_enabled": True} assert auth.signup_enabled is True out = asyncio.run(target(body=SetOpenRegistrationRequest(enabled=True), request=request)) assert out == {"ok": True, "signup_enabled": True} assert auth.signup_enabled is True def test_set_signup_enabled_false_is_idempotent(): from routes.auth_routes import SetOpenRegistrationRequest auth, target = _auth_route_endpoint("/api/auth/open-signup", "PUT") auth.get_username_for_token.return_value = "admin" auth.is_admin.return_value = True request = _fake_auth_request() auth.signup_enabled = True out = asyncio.run(target(body=SetOpenRegistrationRequest(enabled=False), request=request)) assert out == {"ok": True, "signup_enabled": False} assert auth.signup_enabled is False out = asyncio.run(target(body=SetOpenRegistrationRequest(enabled=False), request=request)) assert out == {"ok": True, "signup_enabled": False} assert auth.signup_enabled is False def test_set_signup_enabled_requires_admin(): from routes.auth_routes import SetOpenRegistrationRequest auth, target = _auth_route_endpoint("/api/auth/open-signup", "PUT") auth.get_username_for_token.return_value = "bob" auth.is_admin.return_value = False auth.signup_enabled = False with pytest.raises(HTTPException) as exc: asyncio.run(target(body=SetOpenRegistrationRequest(enabled=True), request=_fake_auth_request())) assert exc.value.status_code == 403 assert auth.signup_enabled is False # --------------------------------------------------------------------------- # Research endpoints — `_require_user` rejects anonymous # --------------------------------------------------------------------------- def _build_research_router(): """Construct the research router with a mock research_handler so we can fish out the inner `_require_user` helper without booting the full app.""" from routes.research_routes import setup_research_routes rh = MagicMock() setup_research_routes(rh) # The helper lives inside the setup closure. Easiest way to exercise # it: re-import the module and grab the symbol via its source. # Instead, exercise it via the route helper that has request:Request. return rh def _fake_request(user=None): """Cheap stand-in for fastapi.Request — only `request.state.current_user` matters to `get_current_user`.""" req = SimpleNamespace() req.state = SimpleNamespace(current_user=user) # Some endpoints touch .client too — provide a benign default. req.client = SimpleNamespace(host="127.0.0.1") return req def test_research_status_rejects_anonymous(): """research_status must 401 when no user is on the request state.""" # Build a fresh router and pluck its registered routes. from routes.research_routes import setup_research_routes rh = MagicMock() rh.get_status.return_value = {"status": "running"} # would 200 if auth passed router = setup_research_routes(rh) # Find the function registered for /api/research/status/{session_id} target = None for route in router.routes: if getattr(route, "path", "") == "/api/research/status/{session_id}": target = route.endpoint break assert target is not None, "research_status route not registered" with pytest.raises(HTTPException) as exc: asyncio.run(target(session_id="x", request=_fake_request(user=None))) assert exc.value.status_code == 401 def test_research_status_accepts_authenticated(): from routes.research_routes import setup_research_routes rh = MagicMock() rh._active_tasks = {"x": {"owner": "alice", "status": "running"}} rh.get_status.return_value = {"status": "running", "progress": {}} router = setup_research_routes(rh) target = next(r.endpoint for r in router.routes if getattr(r, "path", "") == "/api/research/status/{session_id}") out = asyncio.run(target(session_id="x", request=_fake_request(user="alice"))) assert out == {"status": "running", "progress": {}} def test_research_status_rejects_wrong_owner(): from routes.research_routes import setup_research_routes rh = MagicMock() rh._active_tasks = {"x": {"owner": "alice", "status": "running"}} rh.get_status.return_value = {"status": "running", "progress": {}} router = setup_research_routes(rh) target = next(r.endpoint for r in router.routes if getattr(r, "path", "") == "/api/research/status/{session_id}") with pytest.raises(HTTPException) as exc: asyncio.run(target(session_id="x", request=_fake_request(user="bob"))) assert exc.value.status_code == 404 def test_research_cancel_rejects_anonymous(): from routes.research_routes import setup_research_routes rh = MagicMock() router = setup_research_routes(rh) target = next(r.endpoint for r in router.routes if getattr(r, "path", "") == "/api/research/cancel/{session_id}") with pytest.raises(HTTPException) as exc: asyncio.run(target(session_id="x", request=_fake_request(user=None))) assert exc.value.status_code == 401 def test_research_delete_rejects_anonymous(): from routes.research_routes import setup_research_routes rh = MagicMock() router = setup_research_routes(rh) target = next(r.endpoint for r in router.routes if getattr(r, "path", "") == "/api/research/{session_id}") # Note: `target` here is the most-recently registered route on this # path which is the DELETE. Either /detail or /delete both match # other paths — the {session_id} bare path is DELETE. with pytest.raises(HTTPException) as exc: asyncio.run(target(session_id="x", request=_fake_request(user=None))) assert exc.value.status_code == 401 def test_research_spinoff_rejects_anonymous(): """spinoff must 401 before reading any research data.""" from routes.research_routes import setup_research_routes rh = MagicMock() router = setup_research_routes(rh, session_manager=MagicMock()) target = next(r.endpoint for r in router.routes if getattr(r, "path", "") == "/api/research/spinoff/{session_id}") with pytest.raises(HTTPException) as exc: asyncio.run(target(session_id="x", request=_fake_request(user=None))) assert exc.value.status_code == 401 def test_research_spinoff_rejects_wrong_owner(): """A user must not be able to spin off (and thereby read) another user's research report. The ownership gate must 404 before any data is read or a new session is created. Regression for the cross-user disclosure IDOR.""" from routes.research_routes import setup_research_routes sm = MagicMock() rh = MagicMock() rh._active_tasks = {"x": {"owner": "alice"}} rh.get_result.return_value = "TOP SECRET REPORT" router = setup_research_routes(rh, session_manager=sm) target = next(r.endpoint for r in router.routes if getattr(r, "path", "") == "/api/research/spinoff/{session_id}") with pytest.raises(HTTPException) as exc: asyncio.run(target(session_id="x", request=_fake_request(user="bob"))) assert exc.value.status_code == 404 # The attacker must never get a session created on their behalf. sm.create_session.assert_not_called() # --------------------------------------------------------------------------- # pop_notifications owner filter # --------------------------------------------------------------------------- def test_pop_notifications_owner_filtered(): """pop_notifications(owner='alice') must return only alice's items. bob's and legacy ownerless items stay behind in the queue.""" # Build a minimal scheduler instance that we can hit directly. # Reuse the real class so the test catches future regressions of # the filter logic. import sys, types from unittest.mock import MagicMock as _MM # `task_scheduler` pulls in lots of helpers — stub the ones it uses. for s in ["src.builtin_actions", "src.ai_interaction", "src.endpoint_resolver", "src.agent_loop", "src.session_manager"]: if s not in sys.modules: mod = types.ModuleType(s) sys.modules[s] = mod from src.task_scheduler import TaskScheduler sch = TaskScheduler.__new__(TaskScheduler) # bypass __init__ network etc. sch._pending_notifications = [] sch.add_notification("t1", "success", "id1", owner="alice") sch.add_notification("t2", "error", "id2", owner="bob") sch.add_notification("t3", "success", "id3", owner=None) sch.add_notification("t4", "success", "id4", owner="alice") alice = sch.pop_notifications(owner="alice") alice_names = {n["task_name"] for n in alice} # alice gets only her own rows; bob's row and legacy null-owner rows stay. assert alice_names == {"t1", "t4"} # bob's row and the legacy ownerless row are still queued. remaining = sch._pending_notifications assert {n["task_name"] for n in remaining} == {"t2", "t3"} # Anonymous caller (owner=None) drains everything that's left. rest = sch.pop_notifications(owner=None) assert {n["task_name"] for n in rest} == {"t2", "t3"} assert sch._pending_notifications == [] # --------------------------------------------------------------------------- # Task action allowlist # --------------------------------------------------------------------------- def test_admin_only_actions_set_contains_shell_runners(): """The constant defining shell-executing action types must include the three risky entries. Catches accidental removal.""" from routes import task_routes # `_ADMIN_ONLY_ACTIONS` is a closure constant. Easiest pin: re-read # the source and check for the three risky entries + the admin gate # wording. src = open(task_routes.__file__, encoding="utf-8").read() assert '"run_local"' in src assert '"run_script"' in src assert '"ssh_command"' in src # And the gate is wired into both create and update paths. assert "Action '" in src and "requires admin privileges" in src def test_task_create_notification_default_allows_action_specific_defaults(): """Omitted notifications_enabled should stay None so create_task can default noisy/quiet built-ins differently.""" from routes.task_routes import TaskCreate req = TaskCreate(task_type="action", action="check_email_urgency", schedule="cron", cron_expression="*/15 * * * *") assert req.notifications_enabled is None def test_ship_paused_housekeeping_stays_paused_by_default(): """Built-ins marked ship_paused are intentionally opt-in even after the user enables the rest of Tasks.""" from routes import task_routes from src import task_scheduler route_src = open(task_routes.__file__, encoding="utf-8").read() scheduler_src = open(task_scheduler.__file__, encoding="utf-8").read() assert '"ship_paused": True' in scheduler_src assert 'defs.get("ship_paused")' in scheduler_src assert 'defs.get("ship_paused")' in route_src def test_task_payload_exposes_crew_member_id_for_ui_category(): from routes import task_routes src = open(task_routes.__file__, encoding="utf-8").read() assert '"crew_member_id"' in src