fix(auth): revoke API tokens when deleting users

* fix: revoke API bearer tokens when their owner is deleted

* Re-run CI

* Invalidate bearer-token cache on user delete so warmed cached tokens stop working
This commit is contained in:
Afonso Coutinho
2026-06-04 04:44:34 +01:00
committed by GitHub
parent 666babfd58
commit 09fe308720
4 changed files with 197 additions and 0 deletions

View File

@@ -0,0 +1,58 @@
"""Deleting a user must invalidate the bearer-token cache.
delete_user removes the user's ApiToken rows from the DB, but the bearer-auth
middleware in app.py serves from an in-memory prefix->token cache that only
rebuilds when flagged dirty (app.state.invalidate_token_cache). If the admin
delete route does not flag it, a deleted user's already-cached token keeps
authenticating until some unrelated token op or a process restart clears the
cache. The DELETE /api/auth/users handler now calls the invalidator on a
successful delete (and only then), so the next bearer request rebuilds the
cache from the DB, where the rows are already gone, and the token is rejected.
"""
import asyncio
import types
from routes.auth_routes import setup_auth_routes, DeleteUserRequest
def _handler(router):
for route in router.routes:
if getattr(route, "path", "") == "/api/auth/users" and "DELETE" in getattr(route, "methods", set()):
return route.endpoint
raise AssertionError("DELETE /api/auth/users handler not found")
def _fake_request(invalidations):
state = types.SimpleNamespace(invalidate_token_cache=lambda: invalidations.append(True))
app = types.SimpleNamespace(state=state)
return types.SimpleNamespace(cookies={"_dummy": "x"}, app=app)
def _auth_manager(delete_result):
return types.SimpleNamespace(
get_username_for_token=lambda token: "admin",
is_admin=lambda user: True,
delete_user=lambda username, requesting_user: delete_result,
)
def test_successful_delete_invalidates_cache():
invalidations = []
router = setup_auth_routes(_auth_manager(delete_result=True))
handler = _handler(router)
result = asyncio.run(handler(DeleteUserRequest(username="bob"), _fake_request(invalidations)))
assert result == {"ok": True}
assert invalidations == [True], "successful delete must flag the token cache stale"
def test_refused_delete_does_not_invalidate_cache():
invalidations = []
router = setup_auth_routes(_auth_manager(delete_result=False))
handler = _handler(router)
try:
asyncio.run(handler(DeleteUserRequest(username="admin"), _fake_request(invalidations)))
raised = False
except Exception:
raised = True
assert raised, "a refused delete should raise (HTTP 400)"
assert invalidations == [], "a refused delete must not touch the token cache"

View File

@@ -0,0 +1,116 @@
"""Deleting a user must also revoke their API bearer tokens.
Regression test: delete_user purged cookie sessions but left ApiToken
rows behind, so a deleted user could keep authenticating with an
"ody_..." bearer token forever.
"""
import contextlib
import importlib
import sys
import types
from pathlib import Path
import pytest
def _real_core_package():
root = Path(__file__).resolve().parent.parent
core_path = str(root / "core")
core = sys.modules.get("core")
if core is None:
core = types.ModuleType("core")
sys.modules["core"] = core
core.__path__ = [core_path]
if hasattr(core, "auth"):
delattr(core, "auth")
sys.modules.pop("core.auth", None)
return core
def _auth_module():
_real_core_package()
return importlib.import_module("core.auth")
class _OwnerColumn:
"""Mimics a SQLAlchemy column: ApiToken.owner == x yields a marker."""
def __eq__(self, other):
return ("owner ==", other)
def __hash__(self):
return id(self)
class _FakeApiToken:
owner = _OwnerColumn()
class _FakeQuery:
def __init__(self, recorder):
self._recorder = recorder
self._conds = []
def filter(self, *conds):
self._conds.extend(conds)
return self
def delete(self, *args, **kwargs):
self._recorder.append(list(self._conds))
return len(self._conds)
class _FakeSession:
def __init__(self, recorder):
self._recorder = recorder
def query(self, model):
assert model is _FakeApiToken
return _FakeQuery(self._recorder)
@pytest.fixture
def manager(tmp_path, monkeypatch):
auth_mod = _auth_module()
monkeypatch.setattr(auth_mod, "_hash_password", lambda password: f"hash:{password}")
monkeypatch.setattr(
auth_mod, "_verify_password", lambda password, hashed: hashed == f"hash:{password}"
)
mgr = auth_mod.AuthManager(str(tmp_path / "auth.json"))
assert mgr.create_user("admin", "secret-admin-pw", is_admin=True)
assert mgr.create_user("bob", "secret-bob-pw", is_admin=False)
return mgr
@pytest.fixture
def db_calls(monkeypatch):
calls = []
@contextlib.contextmanager
def _fake_db_session():
yield _FakeSession(calls)
db_stub = types.ModuleType("core.database")
db_stub.get_db_session = _fake_db_session
db_stub.ApiToken = _FakeApiToken
monkeypatch.setitem(sys.modules, "core.database", db_stub)
return calls
def test_delete_user_revokes_api_tokens(manager, db_calls):
assert manager.delete_user("bob", "admin") is True
assert "bob" not in manager.users
assert db_calls, "delete_user never purged ApiToken rows for the deleted user"
assert [("owner ==", "bob")] in db_calls
def test_refused_delete_leaves_tokens_alone(manager, db_calls):
assert manager.delete_user("admin", "bob") is False
assert "admin" in manager.users
assert db_calls == []
def test_unknown_user_leaves_tokens_alone(manager, db_calls):
assert manager.delete_user("ghost", "admin") is False
assert db_calls == []