diff --git a/core/auth.py b/core/auth.py index 1e68a72..0ed140f 100644 --- a/core/auth.py +++ b/core/auth.py @@ -266,7 +266,8 @@ class AuthManager: renamed_sessions = 0 with self._sessions_lock: for sess in self._sessions.values(): - if (sess or {}).get("username") == old_username: + sess_user = str((sess or {}).get("username") or "").strip().lower() + if sess_user == old_username: sess["username"] = new_username renamed_sessions += 1 if renamed_sessions: diff --git a/routes/auth_routes.py b/routes/auth_routes.py index 2f9b0b1..5728d3e 100644 --- a/routes/auth_routes.py +++ b/routes/auth_routes.py @@ -297,6 +297,7 @@ def setup_auth_routes(auth_manager: AuthManager) -> APIRouter: # owner-scoped DB rows before changing auth so the account keeps # access to its sessions, docs, email accounts, tasks, etc. try: + from sqlalchemy import func from core.database import Base, SessionLocal db = SessionLocal() try: @@ -306,7 +307,7 @@ def setup_auth_routes(auth_manager: AuthManager) -> APIRouter: continue ( db.query(model) - .filter(model.owner == old_username) + .filter(func.lower(model.owner) == old_username) .update({"owner": new_username}, synchronize_session=False) ) db.commit() @@ -324,9 +325,15 @@ def setup_auth_routes(auth_manager: AuthManager) -> APIRouter: from routes.prefs_routes import _load as _load_prefs, _save as _save_prefs prefs = _load_prefs() users = prefs.get("_users") if isinstance(prefs, dict) else None - if isinstance(users, dict) and old_username in users and new_username not in users: - users[new_username] = users.pop(old_username) - _save_prefs(prefs) + if isinstance(users, dict): + prefs_key = next( + (k for k in users if str(k).strip().lower() == old_username), + None, + ) + new_taken = any(str(k).strip().lower() == new_username for k in users) + if prefs_key is not None and not new_taken: + users[new_username] = users.pop(prefs_key) + _save_prefs(prefs) except Exception as e: logger.warning("Failed to rename user prefs %s -> %s: %s", old_username, new_username, e) diff --git a/tests/test_rename_user_case_insensitive.py b/tests/test_rename_user_case_insensitive.py new file mode 100644 index 0000000..624bc87 --- /dev/null +++ b/tests/test_rename_user_case_insensitive.py @@ -0,0 +1,86 @@ +"""Regression: username rename must migrate mixed-case legacy owner keys. + +Before lowercasing was enforced everywhere, rows could be stored with +owner='Admin' while auth usernames are normalized to 'admin'. A case- +sensitive filter would skip those rows during rename (issue #1165). +""" + +import importlib +import sys +import time +import types +from pathlib import Path +from unittest.mock import MagicMock + +import pytest + + +def _real_core_package(): + root = Path(__file__).resolve().parent.parent + core_path = str(root / "core") + core = sys.modules.get("core") + if core is None: + core = types.ModuleType("core") + sys.modules["core"] = core + core.__path__ = [core_path] + if hasattr(core, "auth"): + delattr(core, "auth") + sys.modules.pop("core.auth", None) + return core + + +def _fresh_auth_manager(tmp_path): + auth_mod = importlib.import_module("core.auth", package=_real_core_package()) + auth_mod._hash_password = lambda password: f"hash:{password}" + auth_mod._verify_password = lambda password, hashed: hashed == f"hash:{password}" + return auth_mod.AuthManager(str(tmp_path / "auth.json")) + + +def test_rename_user_updates_mixed_case_session_username(tmp_path): + mgr = _fresh_auth_manager(tmp_path) + assert mgr.create_user("admin", "pw-123456", is_admin=True) is True + assert mgr.create_user("bob", "pw-123456") is True + with mgr._sessions_lock: + mgr._sessions["tok1"] = {"username": "Bob", "expiry": time.time() + 3600} + assert mgr.rename_user("bob", "robert", "admin") is True + with mgr._sessions_lock: + assert mgr._sessions["tok1"]["username"] == "robert" + + +def _has_real_sqlalchemy(): + mod = sys.modules.get("sqlalchemy") + if mod is None or isinstance(mod, MagicMock): + return False + return hasattr(mod, "create_engine") + + +@pytest.mark.skipif(not _has_real_sqlalchemy(), reason="sqlalchemy not installed") +def test_rename_owner_db_filter_is_case_insensitive(): + from sqlalchemy import create_engine, func + from sqlalchemy.orm import sessionmaker + + from core.database import Base, Session as DbSession + + engine = create_engine("sqlite:///:memory:") + Base.metadata.create_all(bind=engine) + db = sessionmaker(bind=engine)() + db.add( + DbSession( + id="s1", + name="chat", + endpoint_url="http://localhost:8000", + model="gpt-4", + owner="Bob", + ) + ) + db.commit() + + old_username = "bob" + new_username = "robert" + db.query(DbSession).filter(func.lower(DbSession.owner) == old_username).update( + {"owner": new_username}, + synchronize_session=False, + ) + db.commit() + + assert db.query(DbSession).first().owner == "robert"