diff --git a/tests/test_api_token_routes.py b/tests/test_api_token_routes.py new file mode 100644 index 0000000..611324e --- /dev/null +++ b/tests/test_api_token_routes.py @@ -0,0 +1,294 @@ +"""Tests for API token CRUD route handlers. + +Covers GET /api/tokens, POST /api/tokens, DELETE /api/tokens/{token_id}. +Uses direct endpoint extraction from setup_api_token_routes().routes and +fake objects only — no real DB, no network, no external services. +""" + +import contextlib +import datetime +import secrets as _secrets_mod +import sys +import types +import uuid as _uuid_mod +from types import SimpleNamespace +from unittest.mock import MagicMock + +import pytest + +from fastapi import HTTPException + + +# --------------------------------------------------------------------------- +# Fixture: install per-test stubs via monkeypatch so they are torn down +# automatically and never leak into sibling tests in the same pytest session. +# --------------------------------------------------------------------------- + + +@pytest.fixture +def token_routes_mod(monkeypatch): + """Yield routes.api_token_routes imported under isolated module stubs. + + Two stubs are required: + - python_multipart: FastAPI validates Form() params at router-registration + time and raises RuntimeError when the package is absent. + - core.database: the real module declares SQLAlchemy ORM models at import + time; the conftest sqlalchemy stubs cause a metaclass conflict. + + Both are installed with monkeypatch.setitem so they are restored after + each test without touching any other test's module state. + """ + # python-multipart stub + mp_stub = types.ModuleType("python_multipart") + mp_stub.__version__ = "0.0.13" + monkeypatch.setitem(sys.modules, "python_multipart", mp_stub) + + # core.database stub: __getattr__ resolves any ORM name to a MagicMock + class _DBStub(types.ModuleType): + def __getattr__(self, name): + return MagicMock() + + @contextlib.contextmanager + def _noop_db_session(): + yield MagicMock() + + db_stub = _DBStub("core.database") + db_stub.get_db_session = _noop_db_session + db_stub.ApiToken = MagicMock() + monkeypatch.setitem(sys.modules, "core.database", db_stub) + + # Force a fresh import so the route module binds to the stubbed core.database + monkeypatch.delitem(sys.modules, "routes.api_token_routes", raising=False) + + import routes.api_token_routes as mod # noqa: PLC0415 + return mod + + +# --------------------------------------------------------------------------- +# Pure helpers — no module-level side effects +# --------------------------------------------------------------------------- + + +def _admin_mgr(is_admin: bool): + return SimpleNamespace(is_admin=lambda u: is_admin, is_configured=True) + + +def _req(current_user: str, *, is_admin: bool = False, invalidator=None): + app_state = SimpleNamespace(auth_manager=_admin_mgr(is_admin)) + if invalidator is not None: + app_state.invalidate_token_cache = invalidator + return SimpleNamespace( + state=SimpleNamespace(current_user=current_user), + headers={}, + app=SimpleNamespace(state=app_state), + ) + + +def _get_handler(mod, method: str, path_pattern: str): + """Extract a route endpoint from setup_api_token_routes() by method and path fragment.""" + router = mod.setup_api_token_routes() + for route in router.routes: + path = getattr(route, "path", "") + methods = getattr(route, "methods", None) or set() + if path_pattern in path and method.upper() in methods: + return route.endpoint + raise KeyError(f"No {method} route matching '{path_pattern}'") + + +@contextlib.contextmanager +def _db_ctx(session): + yield session + + +# --------------------------------------------------------------------------- +# 1. Admin gate — all three endpoints reject non-admin callers +# --------------------------------------------------------------------------- + + +def test_api_token_routes_require_admin_for_list_create_delete(monkeypatch, token_routes_mod): + monkeypatch.setenv("AUTH_ENABLED", "true") + mod = token_routes_mod + + list_tokens = _get_handler(mod, "GET", "/tokens") + create_token = _get_handler(mod, "POST", "/tokens") + delete_token = _get_handler(mod, "DELETE", "/tokens/{token_id}") + + non_admin = _req("bob", is_admin=False) + + for handler, kwargs in [ + (list_tokens, {"request": non_admin}), + (create_token, {"request": non_admin, "name": "my-token"}), + (delete_token, {"request": non_admin, "token_id": "abc12345"}), + ]: + with pytest.raises(HTTPException) as exc: + handler(**kwargs) + assert exc.value.status_code == 403 + + +# --------------------------------------------------------------------------- +# 2. POST /api/tokens — owner attribution, hashed at rest, raw returned once +# --------------------------------------------------------------------------- + + +def test_create_token_attributes_owner_hashes_secret_and_returns_raw_once(monkeypatch, token_routes_mod): + monkeypatch.setenv("AUTH_ENABLED", "true") + mod = token_routes_mod + + fake_suffix = "FAKESUFFIX_XXXXXXXXXXXXXXXXXXXXXXXXXX" + fake_uuid_str = "abcd1234-0000-0000-0000-000000000000" + fake_hash = b"$2b$12$FAKEHASHVALUE" + + monkeypatch.setattr(_secrets_mod, "token_urlsafe", lambda n: fake_suffix) + + class _FakeUUID: + def __str__(self): + return fake_uuid_str + + monkeypatch.setattr(_uuid_mod, "uuid4", _FakeUUID) + + fake_bcrypt = SimpleNamespace( + hashpw=lambda pw, salt: fake_hash, + gensalt=lambda: b"fakesalt", + ) + monkeypatch.setattr(mod, "bcrypt", fake_bcrypt) + + captured = {} + + class _FakeApiToken: + def __init__(self, **kw): + captured.clear() + captured.update(kw) + self.__dict__.update(kw) + + fake_session = MagicMock() + monkeypatch.setattr(mod, "ApiToken", _FakeApiToken) + monkeypatch.setattr(mod, "get_db_session", lambda: _db_ctx(fake_session)) + monkeypatch.setattr(mod, "get_current_user", lambda req: req.state.current_user) + + invalidator = MagicMock() + req = _req("alice", is_admin=True, invalidator=invalidator) + create_token = _get_handler(mod, "POST", "/tokens") + resp = create_token(request=req, name="my-token") + + expected_raw = "ody_" + fake_suffix + expected_prefix = expected_raw[:8] + expected_id = fake_uuid_str[:8] + + assert resp["token"] == expected_raw + assert resp["token"].startswith("ody_") + assert resp["token_prefix"] == expected_prefix + assert resp["id"] == expected_id + assert resp["owner"] == "alice" + assert resp["scopes"] == ["chat"] + + assert captured["owner"] == "alice" + assert captured["scopes"] == "chat" + assert captured["is_active"] is True + assert captured["token_hash"] == fake_hash.decode() + assert captured["token_hash"] != expected_raw + assert captured["token_prefix"] == expected_prefix + + invalidator.assert_called_once() + + +# --------------------------------------------------------------------------- +# 3. GET /api/tokens — safe display fields only, no hash or raw token +# --------------------------------------------------------------------------- + + +def test_list_tokens_returns_safe_display_fields_only(monkeypatch, token_routes_mod): + monkeypatch.setenv("AUTH_ENABLED", "true") + mod = token_routes_mod + monkeypatch.setattr(mod, "get_current_user", lambda req: req.state.current_user) + + row1 = SimpleNamespace( + id="tok001", + name="Production", + owner="alice", + token_prefix="ody_prod", + token_hash="$2b$12$SHOULDNEVERAPPEAR", + scopes="chat,research", + is_active=True, + last_used_at=datetime.datetime(2024, 1, 15, 10, 0), + created_at=datetime.datetime(2024, 1, 1, 0, 0), + ) + # Empty scopes should default to ["chat"] + row2 = SimpleNamespace( + id="tok002", + name="Empty scopes", + owner="bob", + token_prefix="ody_empt", + token_hash="$2b$12$ALSONEVERSHOWN", + scopes="", + is_active=False, + last_used_at=None, + created_at=datetime.datetime(2024, 2, 1, 0, 0), + ) + + fake_session = MagicMock() + fake_session.query.return_value.all.return_value = [row1, row2] + monkeypatch.setattr(mod, "get_db_session", lambda: _db_ctx(fake_session)) + + req = _req("alice", is_admin=True) + list_tokens = _get_handler(mod, "GET", "/tokens") + result = list_tokens(request=req) + + assert len(result) == 2 + + safe_fields = {"id", "name", "owner", "token_prefix", "scopes", "is_active", "last_used_at", "created_at"} + for item in result: + assert set(item.keys()) == safe_fields + assert "token" not in item + assert "token_hash" not in item + + assert result[0]["scopes"] == ["chat", "research"] + assert result[1]["scopes"] == ["chat"] + + +# --------------------------------------------------------------------------- +# 4. DELETE /api/tokens/{id} — found → deleted + cache invalidated +# --------------------------------------------------------------------------- + + +def test_delete_token_deletes_and_invalidates_cache(monkeypatch, token_routes_mod): + monkeypatch.setenv("AUTH_ENABLED", "true") + mod = token_routes_mod + monkeypatch.setattr(mod, "get_current_user", lambda req: req.state.current_user) + monkeypatch.setattr(mod, "ApiToken", MagicMock()) + + fake_session = MagicMock() + fake_session.query.return_value.filter.return_value.delete.return_value = 1 + monkeypatch.setattr(mod, "get_db_session", lambda: _db_ctx(fake_session)) + + invalidator = MagicMock() + req = _req("alice", is_admin=True, invalidator=invalidator) + delete_token = _get_handler(mod, "DELETE", "/tokens/{token_id}") + resp = delete_token(request=req, token_id="abcd1234") + + assert resp == {"status": "deleted"} + invalidator.assert_called_once() + + +# --------------------------------------------------------------------------- +# 5. DELETE /api/tokens/{id} — not found → 404, cache NOT invalidated +# --------------------------------------------------------------------------- + + +def test_delete_missing_token_returns_404_without_invalidating_cache(monkeypatch, token_routes_mod): + monkeypatch.setenv("AUTH_ENABLED", "true") + mod = token_routes_mod + monkeypatch.setattr(mod, "get_current_user", lambda req: req.state.current_user) + monkeypatch.setattr(mod, "ApiToken", MagicMock()) + + fake_session = MagicMock() + fake_session.query.return_value.filter.return_value.delete.return_value = 0 + monkeypatch.setattr(mod, "get_db_session", lambda: _db_ctx(fake_session)) + + invalidator = MagicMock() + req = _req("alice", is_admin=True, invalidator=invalidator) + delete_token = _get_handler(mod, "DELETE", "/tokens/{token_id}") + + with pytest.raises(HTTPException) as exc: + delete_token(request=req, token_id="missing99") + assert exc.value.status_code == 404 + invalidator.assert_not_called()