From 0a7de1fdf4ee9ec086f6c73124500981adcd3d9e Mon Sep 17 00:00:00 2001 From: Collin <89503725+CollinOS@users.noreply.github.com> Date: Mon, 1 Jun 2026 00:57:48 -0400 Subject: [PATCH] fix: stop leaking DB connections when persisting session mode (#64) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit chat_routes.py persisted a session's "mode" in three best-effort spots — reading the current mode, writing the effective mode, and setting research_pending on the stream path. Each opened a session with SessionLocal() and called .close() as the LAST statement inside a try/except, so if anything before close() raised (e.g. a SQLite "database is locked" under concurrent chat streams) the except only logged and the connection was never returned to the pool. DATABASE_URL defaults to file-backed SQLite, whose engine uses SQLAlchemy's default QueuePool (5 connections + 10 overflow). Repeated leaks on these hot paths exhaust the pool; later requests then block for pool_timeout and fail with "QueuePool limit ... reached", taking the app down until restart. Move the logic into two best-effort helpers in core.database, next to the existing session helpers (update_session_last_accessed, get_session_by_id): - get_session_mode(session_id) -> Optional[str] - set_session_mode(session_id, mode) -> bool Both route through the existing get_db_session() context manager, which commits on success, rolls back on error, and always closes in a finally, so the connection is returned to the pool on every path. chat_routes.py now calls these instead of hand-rolling sessions, also removing three copies of the same try/except. Add tests/test_session_mode_helpers.py: the helpers commit+close on success and, on a mid-operation DB error, swallow + roll back + close (no leak). The error-path tests fail against the old close()-inside-try pattern. --- core/database.py | 27 +++++++++++++ routes/chat_routes.py | 30 +++------------ tests/test_session_mode_helpers.py | 61 ++++++++++++++++++++++++++++++ 3 files changed, 94 insertions(+), 24 deletions(-) create mode 100644 tests/test_session_mode_helpers.py diff --git a/core/database.py b/core/database.py index 10d99f5..29377c2 100644 --- a/core/database.py +++ b/core/database.py @@ -1755,6 +1755,33 @@ def update_session_last_accessed(session_id: str): return True return False +def get_session_mode(session_id: str): + """Return a session's persisted `mode`, or None if unset/unknown. + + Best-effort: never raises (returns None on any DB error) so callers on hot + request paths needn't guard it. Routed through get_db_session() so the + connection is always returned to the pool.""" + try: + with get_db_session() as db: + return db.query(Session.mode).filter(Session.id == session_id).scalar() + except Exception: + logger.warning("Failed to read mode for session %s", session_id) + return None + +def set_session_mode(session_id: str, mode: str) -> bool: + """Persist a session's `mode`. Best-effort: never raises, returns success. + + Routed through get_db_session() so a failure mid-write (e.g. a SQLite + 'database is locked' under concurrent streams) still returns the connection + to the pool instead of leaking it — repeated leaks would exhaust it.""" + try: + with get_db_session() as db: + db.query(Session).filter(Session.id == session_id).update({"mode": mode}) + return True + except Exception: + logger.warning("Failed to persist mode %r for session %s", mode, session_id) + return False + def get_session_by_id(session_id: str): """Get a session by ID""" with get_db_session() as db: diff --git a/routes/chat_routes.py b/routes/chat_routes.py index bb72ea7..34c4a52 100644 --- a/routes/chat_routes.py +++ b/routes/chat_routes.py @@ -23,7 +23,7 @@ from src.prompt_security import untrusted_context_message from core.exceptions import SessionNotFoundError from src.auth_helpers import get_current_user from routes.session_routes import _verify_session_owner -from core.database import SessionLocal +from core.database import SessionLocal, get_session_mode, set_session_mode from core.database import Session as DBSession, ChatMessage as DBChatMessage from core.database import Document as DBDocument, ModelEndpoint from routes.research_routes import _resolve_research_endpoint @@ -326,26 +326,14 @@ def setup_chat_routes( # Check for research_pending BEFORE mode persist overwrites it do_research = str(use_research).lower() == "true" if not do_research: - try: - _mode_db = SessionLocal() - _db_mode = _mode_db.query(DBSession.mode).filter(DBSession.id == session).scalar() - _mode_db.close() - if _db_mode == 'research_pending': - do_research = True - logger.info(f"Session {session} in research_pending — auto-triggering research") - except Exception: - pass + if get_session_mode(session) == 'research_pending': + do_research = True + logger.info(f"Session {session} in research_pending — auto-triggering research") # Persist session mode (research > agent > chat) _effective_mode = 'research' if do_research else (chat_mode or 'chat') if _effective_mode in ('agent', 'research', 'chat'): - try: - _mdb = SessionLocal() - _mdb.query(DBSession).filter(DBSession.id == session).update({"mode": _effective_mode}) - _mdb.commit() - _mdb.close() - except Exception as _me: - logger.warning("Failed to persist session mode: %s", _me) + set_session_mode(session, _effective_mode) att_ids = [] if body and isinstance(body.get("attachments"), list): @@ -547,13 +535,7 @@ def setup_chat_routes( logger.info(f"First research message — asking clarifying questions for: {message[:60]}") yield f'data: {json.dumps({"type": "model_info", "model": sess.model, "suffix": "Research"})}\n\n' # Set DB mode to research_pending so the NEXT message auto-triggers research - try: - _pdb = SessionLocal() - _pdb.query(DBSession).filter(DBSession.id == session).update({"mode": "research_pending"}) - _pdb.commit() - _pdb.close() - except Exception as _pe: - logger.warning(f"Failed to set research_pending: {_pe}") + set_session_mode(session, "research_pending") ctx.messages.insert(0, {"role": "system", "content": "The user wants to start deep web research. Before searching, ask 2-3 brief " "clarifying questions to understand exactly what they want to know. For example: " diff --git a/tests/test_session_mode_helpers.py b/tests/test_session_mode_helpers.py new file mode 100644 index 0000000..04c9ffa --- /dev/null +++ b/tests/test_session_mode_helpers.py @@ -0,0 +1,61 @@ +"""Pin the leak-safety of the session-mode DB helpers. + +chat_routes.py persists a session's "mode" in three best-effort spots (read +current mode, persist the effective mode, set research_pending). Those spots +previously hand-rolled `SessionLocal()` with `.close()` as the LAST statement +inside a try/except — so any error before close() (e.g. a SQLite "database is +locked" under concurrent streams) leaked the connection. With the default +QueuePool for file SQLite (5 + 10 overflow), accumulated leaks exhaust the +pool and the app can no longer obtain a DB session until restart. + +The logic now lives in core.database.{get,set}_session_mode, which route +through get_db_session() (commit/rollback + guaranteed close). These tests pin +that a mid-operation DB error neither raises out of the helper nor leaks the +connection. The error-path cases fail against the old close()-inside-try +pattern. +""" +import os +os.environ.setdefault("DATABASE_URL", "sqlite:///:memory:") + +from unittest.mock import MagicMock + +from core import database as db + + +def _mock_session(monkeypatch): + """Make get_db_session() hand out a MagicMock session (no real DB).""" + sess = MagicMock() + monkeypatch.setattr(db, "SessionLocal", lambda: sess) + return sess + + +def test_set_session_mode_commits_and_closes_on_success(monkeypatch): + sess = _mock_session(monkeypatch) + assert db.set_session_mode("s1", "agent") is True + sess.query.return_value.filter.return_value.update.assert_called_once_with({"mode": "agent"}) + sess.commit.assert_called_once() + sess.close.assert_called_once() + + +def test_set_session_mode_does_not_leak_on_error(monkeypatch): + sess = _mock_session(monkeypatch) + sess.query.return_value.filter.return_value.update.side_effect = RuntimeError("database is locked") + # Best-effort: the error is swallowed and False returned... + assert db.set_session_mode("s1", "agent") is False + # ...and crucially the connection is still returned to the pool. + sess.rollback.assert_called_once() + sess.close.assert_called_once() + + +def test_get_session_mode_reads_and_closes(monkeypatch): + sess = _mock_session(monkeypatch) + sess.query.return_value.filter.return_value.scalar.return_value = "research_pending" + assert db.get_session_mode("s1") == "research_pending" + sess.close.assert_called_once() + + +def test_get_session_mode_does_not_leak_on_error(monkeypatch): + sess = _mock_session(monkeypatch) + sess.query.return_value.filter.return_value.scalar.side_effect = RuntimeError("database is locked") + assert db.get_session_mode("s1") is None + sess.close.assert_called_once()