"""Regression tests for password-change session revocation.""" import asyncio import importlib import json import sys import types from pathlib import Path from types import SimpleNamespace from unittest.mock import MagicMock import pytest from fastapi import HTTPException def _real_core_package(): root = Path(__file__).resolve().parent.parent core_path = str(root / "core") core = sys.modules.get("core") if core is None: core = types.ModuleType("core") sys.modules["core"] = core core.__path__ = [core_path] if hasattr(core, "auth"): delattr(core, "auth") sys.modules.pop("core.auth", None) return core def _auth_module(): _real_core_package() return importlib.import_module("core.auth") def _make_manager(tmp_path): auth_mod = _auth_module() auth_mod._hash_password = lambda password: f"hash:{password}" auth_mod._verify_password = lambda password, hashed: hashed == f"hash:{password}" auth_path = tmp_path / "auth.json" mgr = auth_mod.AuthManager(str(auth_path)) assert mgr.create_user("alice", "old-password", is_admin=False) assert mgr.create_user("bob", "bob-password", is_admin=False) return mgr def _sessions_on_disk(tmp_path): return json.loads((tmp_path / "sessions.json").read_text(encoding="utf-8")) def test_revoke_user_sessions_preserves_current_and_persists(tmp_path): mgr = _make_manager(tmp_path) current = mgr.create_session("alice", "old-password") other = mgr.create_session("alice", "old-password") bob = mgr.create_session("bob", "bob-password") revoked = mgr.revoke_user_sessions("alice", except_token=current) assert revoked == 1 assert mgr.validate_token(current) is True assert mgr.validate_token(other) is False assert mgr.validate_token(bob) is True persisted = _sessions_on_disk(tmp_path) assert current in persisted assert bob in persisted assert other not in persisted def test_wrong_current_password_does_not_revoke_sessions(tmp_path): mgr = _make_manager(tmp_path) current = mgr.create_session("alice", "old-password") other = mgr.create_session("alice", "old-password") assert mgr.change_password("alice", "wrong-password", "new-password") is False assert mgr.validate_token(current) is True assert mgr.validate_token(other) is True persisted = _sessions_on_disk(tmp_path) assert current in persisted assert other in persisted def test_password_change_allows_new_password_and_blocks_old_password(tmp_path): mgr = _make_manager(tmp_path) assert mgr.change_password("alice", "old-password", "new-password") is True assert mgr.create_session("alice", "old-password") is None assert mgr.create_session("alice", "new-password") is not None def _change_password_endpoint(auth_manager): sys.modules.pop("routes.auth_routes", None) _real_core_package() from routes.auth_routes import ChangePasswordRequest, setup_auth_routes router = setup_auth_routes(auth_manager) for route in router.routes: if getattr(route, "path", None) == "/api/auth/change-password": return route.endpoint, ChangePasswordRequest raise AssertionError("change-password route not found") def test_change_password_route_revokes_other_sessions_after_success(): auth = MagicMock() auth.get_username_for_token.return_value = "alice" auth.change_password.return_value = True endpoint, ChangePasswordRequest = _change_password_endpoint(auth) request = SimpleNamespace(cookies={"odysseus_session": "current-token"}) body = ChangePasswordRequest(current_password="old-password", new_password="new-password") result = asyncio.run(endpoint(body=body, request=request)) assert result == {"ok": True} auth.change_password.assert_called_once_with("alice", "old-password", "new-password") auth.revoke_user_sessions.assert_called_once_with("alice", "current-token") def test_change_password_route_wrong_password_does_not_revoke(): auth = MagicMock() auth.get_username_for_token.return_value = "alice" auth.change_password.return_value = False endpoint, ChangePasswordRequest = _change_password_endpoint(auth) request = SimpleNamespace(cookies={"odysseus_session": "current-token"}) body = ChangePasswordRequest(current_password="wrong-password", new_password="new-password") with pytest.raises(HTTPException) as exc: asyncio.run(endpoint(body=body, request=request)) assert exc.value.status_code == 400 auth.revoke_user_sessions.assert_not_called()