diff --git a/core/auth.py b/core/auth.py index 57ca97b..1e68a72 100644 --- a/core/auth.py +++ b/core/auth.py @@ -479,6 +479,22 @@ class AuthManager: self._sessions.pop(token, None) self._save_sessions() + def revoke_user_sessions(self, username: str, except_token: Optional[str] = None) -> int: + """Revoke active browser sessions for a user, optionally preserving one.""" + username = username.strip().lower() + revoked = 0 + with self._sessions_lock: + to_drop = [ + token for token, session in self._sessions.items() + if token != except_token and (session or {}).get("username") == username + ] + for token in to_drop: + self._sessions.pop(token, None) + revoked += 1 + if revoked: + self._save_sessions() + return revoked + def status(self, token: Optional[str]) -> Dict[str, Any]: username = self.get_username_for_token(token) authenticated = username is not None diff --git a/routes/auth_routes.py b/routes/auth_routes.py index 45c86ed..a817319 100644 --- a/routes/auth_routes.py +++ b/routes/auth_routes.py @@ -178,9 +178,11 @@ def setup_auth_routes(auth_manager: AuthManager) -> APIRouter: raise HTTPException(401, "Not authenticated") if len(body.new_password) < 8: raise HTTPException(400, "Password must be at least 8 characters") + current_token = request.cookies.get(SESSION_COOKIE) ok = await asyncio.to_thread(auth_manager.change_password, user, body.current_password, body.new_password) if not ok: raise HTTPException(400, "Current password is incorrect") + await asyncio.to_thread(auth_manager.revoke_user_sessions, user, current_token) return {"ok": True} # ------------------------------------------------------------------ diff --git a/tests/test_auth_session_revocation.py b/tests/test_auth_session_revocation.py new file mode 100644 index 0000000..0a1b88e --- /dev/null +++ b/tests/test_auth_session_revocation.py @@ -0,0 +1,130 @@ +"""Regression tests for password-change session revocation.""" + +import asyncio +import importlib +import json +import sys +import types +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import MagicMock + +import pytest +from fastapi import HTTPException + + +def _real_core_package(): + root = Path(__file__).resolve().parent.parent + core_path = str(root / "core") + core = sys.modules.get("core") + if core is None: + core = types.ModuleType("core") + sys.modules["core"] = core + core.__path__ = [core_path] + if hasattr(core, "auth"): + delattr(core, "auth") + sys.modules.pop("core.auth", None) + return core + + +def _auth_module(): + _real_core_package() + return importlib.import_module("core.auth") + + +def _make_manager(tmp_path): + auth_mod = _auth_module() + auth_mod._hash_password = lambda password: f"hash:{password}" + auth_mod._verify_password = lambda password, hashed: hashed == f"hash:{password}" + auth_path = tmp_path / "auth.json" + mgr = auth_mod.AuthManager(str(auth_path)) + assert mgr.create_user("alice", "old-password", is_admin=False) + assert mgr.create_user("bob", "bob-password", is_admin=False) + return mgr + + +def _sessions_on_disk(tmp_path): + return json.loads((tmp_path / "sessions.json").read_text(encoding="utf-8")) + + +def test_revoke_user_sessions_preserves_current_and_persists(tmp_path): + mgr = _make_manager(tmp_path) + current = mgr.create_session("alice", "old-password") + other = mgr.create_session("alice", "old-password") + bob = mgr.create_session("bob", "bob-password") + + revoked = mgr.revoke_user_sessions("alice", except_token=current) + + assert revoked == 1 + assert mgr.validate_token(current) is True + assert mgr.validate_token(other) is False + assert mgr.validate_token(bob) is True + persisted = _sessions_on_disk(tmp_path) + assert current in persisted + assert bob in persisted + assert other not in persisted + + +def test_wrong_current_password_does_not_revoke_sessions(tmp_path): + mgr = _make_manager(tmp_path) + current = mgr.create_session("alice", "old-password") + other = mgr.create_session("alice", "old-password") + + assert mgr.change_password("alice", "wrong-password", "new-password") is False + + assert mgr.validate_token(current) is True + assert mgr.validate_token(other) is True + persisted = _sessions_on_disk(tmp_path) + assert current in persisted + assert other in persisted + + +def test_password_change_allows_new_password_and_blocks_old_password(tmp_path): + mgr = _make_manager(tmp_path) + + assert mgr.change_password("alice", "old-password", "new-password") is True + + assert mgr.create_session("alice", "old-password") is None + assert mgr.create_session("alice", "new-password") is not None + + +def _change_password_endpoint(auth_manager): + sys.modules.pop("routes.auth_routes", None) + _real_core_package() + from routes.auth_routes import ChangePasswordRequest, setup_auth_routes + + router = setup_auth_routes(auth_manager) + for route in router.routes: + if getattr(route, "path", None) == "/api/auth/change-password": + return route.endpoint, ChangePasswordRequest + raise AssertionError("change-password route not found") + + +def test_change_password_route_revokes_other_sessions_after_success(): + auth = MagicMock() + auth.get_username_for_token.return_value = "alice" + auth.change_password.return_value = True + endpoint, ChangePasswordRequest = _change_password_endpoint(auth) + request = SimpleNamespace(cookies={"odysseus_session": "current-token"}) + body = ChangePasswordRequest(current_password="old-password", new_password="new-password") + + result = asyncio.run(endpoint(body=body, request=request)) + + assert result == {"ok": True} + auth.change_password.assert_called_once_with("alice", "old-password", "new-password") + auth.revoke_user_sessions.assert_called_once_with("alice", "current-token") + + +def test_change_password_route_wrong_password_does_not_revoke(): + auth = MagicMock() + auth.get_username_for_token.return_value = "alice" + auth.change_password.return_value = False + endpoint, ChangePasswordRequest = _change_password_endpoint(auth) + request = SimpleNamespace(cookies={"odysseus_session": "current-token"}) + body = ChangePasswordRequest(current_password="wrong-password", new_password="new-password") + + with pytest.raises(HTTPException) as exc: + asyncio.run(endpoint(body=body, request=request)) + + assert exc.value.status_code == 400 + auth.revoke_user_sessions.assert_not_called()