From 09fe30872038fe25ad9c464b66fa4486c831ef8f Mon Sep 17 00:00:00 2001 From: Afonso Coutinho Date: Thu, 4 Jun 2026 04:44:34 +0100 Subject: [PATCH] fix(auth): revoke API tokens when deleting users * fix: revoke API bearer tokens when their owner is deleted * Re-run CI * Invalidate bearer-token cache on user delete so warmed cached tokens stop working --- core/auth.py | 12 ++ routes/auth_routes.py | 11 ++ ...est_delete_user_invalidates_token_cache.py | 58 +++++++++ tests/test_delete_user_revokes_api_tokens.py | 116 ++++++++++++++++++ 4 files changed, 197 insertions(+) create mode 100644 tests/test_delete_user_invalidates_token_cache.py create mode 100644 tests/test_delete_user_revokes_api_tokens.py diff --git a/core/auth.py b/core/auth.py index 54635d8..d4f5d36 100644 --- a/core/auth.py +++ b/core/auth.py @@ -241,6 +241,18 @@ class AuthManager: revoked += 1 if revoked: self._save_sessions() + # Also revoke API bearer tokens owned by this user. The bearer auth + # path authenticates straight against ApiToken rows and never + # re-checks that the owner still exists, so leaving the rows behind + # would let a deleted user keep full API access indefinitely. + try: + from core.database import get_db_session, ApiToken + with get_db_session() as db: + removed = db.query(ApiToken).filter(ApiToken.owner == username).delete() + if removed: + logger.info(f"Revoked {removed} API token(s) owned by deleted user '{username}'") + except Exception: + logger.warning(f"Failed to revoke API tokens for deleted user '{username}'") logger.info(f"Deleted user '{username}' (by {requesting_user}); revoked {revoked} active session(s)") return True diff --git a/routes/auth_routes.py b/routes/auth_routes.py index 5728d3e..1992f8c 100644 --- a/routes/auth_routes.py +++ b/routes/auth_routes.py @@ -375,6 +375,17 @@ def setup_auth_routes(auth_manager: AuthManager) -> APIRouter: ok = auth_manager.delete_user(body.username, user) if not ok: raise HTTPException(400, "Cannot delete user") + # delete_user removes the user's ApiToken rows, but the bearer-auth + # middleware serves from an in-memory prefix->token cache that only + # rebuilds when flagged dirty. Without this, a deleted user's already + # cached token keeps authenticating until some other token op or a + # restart clears the cache. Mirror what the token routes do. + try: + invalidator = getattr(request.app.state, "invalidate_token_cache", None) + if invalidator: + invalidator() + except Exception: + pass return {"ok": True} # ---- Feature visibility (admin-managed) ---- diff --git a/tests/test_delete_user_invalidates_token_cache.py b/tests/test_delete_user_invalidates_token_cache.py new file mode 100644 index 0000000..c9cb79a --- /dev/null +++ b/tests/test_delete_user_invalidates_token_cache.py @@ -0,0 +1,58 @@ +"""Deleting a user must invalidate the bearer-token cache. + +delete_user removes the user's ApiToken rows from the DB, but the bearer-auth +middleware in app.py serves from an in-memory prefix->token cache that only +rebuilds when flagged dirty (app.state.invalidate_token_cache). If the admin +delete route does not flag it, a deleted user's already-cached token keeps +authenticating until some unrelated token op or a process restart clears the +cache. The DELETE /api/auth/users handler now calls the invalidator on a +successful delete (and only then), so the next bearer request rebuilds the +cache from the DB, where the rows are already gone, and the token is rejected. +""" +import asyncio +import types + +from routes.auth_routes import setup_auth_routes, DeleteUserRequest + + +def _handler(router): + for route in router.routes: + if getattr(route, "path", "") == "/api/auth/users" and "DELETE" in getattr(route, "methods", set()): + return route.endpoint + raise AssertionError("DELETE /api/auth/users handler not found") + + +def _fake_request(invalidations): + state = types.SimpleNamespace(invalidate_token_cache=lambda: invalidations.append(True)) + app = types.SimpleNamespace(state=state) + return types.SimpleNamespace(cookies={"_dummy": "x"}, app=app) + + +def _auth_manager(delete_result): + return types.SimpleNamespace( + get_username_for_token=lambda token: "admin", + is_admin=lambda user: True, + delete_user=lambda username, requesting_user: delete_result, + ) + + +def test_successful_delete_invalidates_cache(): + invalidations = [] + router = setup_auth_routes(_auth_manager(delete_result=True)) + handler = _handler(router) + result = asyncio.run(handler(DeleteUserRequest(username="bob"), _fake_request(invalidations))) + assert result == {"ok": True} + assert invalidations == [True], "successful delete must flag the token cache stale" + + +def test_refused_delete_does_not_invalidate_cache(): + invalidations = [] + router = setup_auth_routes(_auth_manager(delete_result=False)) + handler = _handler(router) + try: + asyncio.run(handler(DeleteUserRequest(username="admin"), _fake_request(invalidations))) + raised = False + except Exception: + raised = True + assert raised, "a refused delete should raise (HTTP 400)" + assert invalidations == [], "a refused delete must not touch the token cache" diff --git a/tests/test_delete_user_revokes_api_tokens.py b/tests/test_delete_user_revokes_api_tokens.py new file mode 100644 index 0000000..3d646c7 --- /dev/null +++ b/tests/test_delete_user_revokes_api_tokens.py @@ -0,0 +1,116 @@ +"""Deleting a user must also revoke their API bearer tokens. + +Regression test: delete_user purged cookie sessions but left ApiToken +rows behind, so a deleted user could keep authenticating with an +"ody_..." bearer token forever. +""" + +import contextlib +import importlib +import sys +import types +from pathlib import Path + +import pytest + + +def _real_core_package(): + root = Path(__file__).resolve().parent.parent + core_path = str(root / "core") + core = sys.modules.get("core") + if core is None: + core = types.ModuleType("core") + sys.modules["core"] = core + core.__path__ = [core_path] + if hasattr(core, "auth"): + delattr(core, "auth") + sys.modules.pop("core.auth", None) + return core + + +def _auth_module(): + _real_core_package() + return importlib.import_module("core.auth") + + +class _OwnerColumn: + """Mimics a SQLAlchemy column: ApiToken.owner == x yields a marker.""" + + def __eq__(self, other): + return ("owner ==", other) + + def __hash__(self): + return id(self) + + +class _FakeApiToken: + owner = _OwnerColumn() + + +class _FakeQuery: + def __init__(self, recorder): + self._recorder = recorder + self._conds = [] + + def filter(self, *conds): + self._conds.extend(conds) + return self + + def delete(self, *args, **kwargs): + self._recorder.append(list(self._conds)) + return len(self._conds) + + +class _FakeSession: + def __init__(self, recorder): + self._recorder = recorder + + def query(self, model): + assert model is _FakeApiToken + return _FakeQuery(self._recorder) + + +@pytest.fixture +def manager(tmp_path, monkeypatch): + auth_mod = _auth_module() + monkeypatch.setattr(auth_mod, "_hash_password", lambda password: f"hash:{password}") + monkeypatch.setattr( + auth_mod, "_verify_password", lambda password, hashed: hashed == f"hash:{password}" + ) + mgr = auth_mod.AuthManager(str(tmp_path / "auth.json")) + assert mgr.create_user("admin", "secret-admin-pw", is_admin=True) + assert mgr.create_user("bob", "secret-bob-pw", is_admin=False) + return mgr + + +@pytest.fixture +def db_calls(monkeypatch): + calls = [] + + @contextlib.contextmanager + def _fake_db_session(): + yield _FakeSession(calls) + + db_stub = types.ModuleType("core.database") + db_stub.get_db_session = _fake_db_session + db_stub.ApiToken = _FakeApiToken + monkeypatch.setitem(sys.modules, "core.database", db_stub) + return calls + + +def test_delete_user_revokes_api_tokens(manager, db_calls): + assert manager.delete_user("bob", "admin") is True + assert "bob" not in manager.users + assert db_calls, "delete_user never purged ApiToken rows for the deleted user" + assert [("owner ==", "bob")] in db_calls + + +def test_refused_delete_leaves_tokens_alone(manager, db_calls): + assert manager.delete_user("admin", "bob") is False + assert "admin" in manager.users + assert db_calls == [] + + +def test_unknown_user_leaves_tokens_alone(manager, db_calls): + assert manager.delete_user("ghost", "admin") is False + assert db_calls == []