diff --git a/routes/email_pollers.py b/routes/email_pollers.py index f6a7b9a..529ba00 100644 --- a/routes/email_pollers.py +++ b/routes/email_pollers.py @@ -99,6 +99,36 @@ async def _run_auto_summarize_once(do_summary: bool = True, do_reply: bool = Tru _save_settings(s2) +def _latest_inbox_fallback_uids(conn, reconnect): + """Latest INBOX UIDs via ``SEARCH ALL``, with a poisoned-socket guard (#1613). + + On a large Gmail mailbox the fallback ``SEARCH ALL`` can time out mid-reply, + leaving its enormous ``* SEARCH `` line unread on the socket. The next + command (the downstream re-select / EXAMINE) then reads those leftover bytes + and fails with ``EXAMINE => unexpected response: b'325188 …'``. Reconnecting + on failure guarantees the downstream command starts from a clean socket. + + Returns ``(uids, conn)`` — ``conn`` is the live connection to keep using: the + same one on success, a fresh one (via ``reconnect()``) if we had to recover. + """ + try: + conn.select("INBOX", readonly=True) + status, data = conn.uid("SEARCH", None, "ALL") + uids = [] + if status == "OK" and data and data[0]: + for u in reversed(data[0].split()[-8:]): + uids.append(("INBOX", u)) + logger.info("Email task SINCE scan found no messages; fell back to latest INBOX messages") + return uids, conn + except Exception as _e: + logger.warning(f"Latest-INBOX fallback scan failed: {_e}") + try: + conn.logout() + except Exception: + pass + return [], reconnect() + + async def _auto_summarize_pass(days_back: int = 1, account_id: str | None = None, progress_cb=None) -> str: """Single pass of the auto-summarize/reply scan. @@ -201,16 +231,12 @@ async def _auto_summarize_pass_single(days_back: int = 1, account_id: str | None # the latest visible inbox messages so Clear cache -> Run again can # actually repopulate AI reply/summary/tag caches. if not uid_list: - try: - conn.select("INBOX", readonly=True) - status, data = conn.uid("SEARCH", None, "ALL") - if status == "OK" and data and data[0]: - for u in reversed(data[0].split()[-8:]): - uid_list.append(("INBOX", u)) - logger.info("Email task SINCE scan found no messages; fell back to latest INBOX messages") - except Exception as _e: - logger.warning(f"Latest-INBOX fallback scan failed: {_e}") - # Re-select INBOX as default for downstream code + _fb_uids, conn = _latest_inbox_fallback_uids( + conn, lambda: _imap_connect(account_id, owner=account_owner) + ) + uid_list.extend(_fb_uids) + # Re-select INBOX as default for downstream code (on a clean socket even + # if the SEARCH ALL fallback above failed — see #1613). conn.select("INBOX", readonly=True) if not uid_list: return "No recent emails" diff --git a/tests/test_email_fallback_reconnect.py b/tests/test_email_fallback_reconnect.py new file mode 100644 index 0000000..3d3b5f3 --- /dev/null +++ b/tests/test_email_fallback_reconnect.py @@ -0,0 +1,69 @@ +"""Regression for issue #1613 — on a large Gmail mailbox the email-summary +poller's `SEARCH ALL` fallback can time out mid-response, leaving its huge +`* SEARCH ` line unread on the socket. The next command (the downstream +re-select / EXAMINE) then reads those leftover bytes and fails with +`EXAMINE => unexpected response: b'325188 …'`. + +`_latest_inbox_fallback_uids` reconnects on a failed SEARCH ALL so the downstream +command always runs on a clean socket. Tested with a fake IMAP connection — no +live server needed; reconnecting is correct by construction (a fresh connection +cannot carry the old one's leftover bytes). +""" +from routes import email_pollers as ep + + +class _FakeConn: + def __init__(self, search_result=None, raise_on_search=False, name="orig"): + self.name = name + self._sr = search_result + self._raise = raise_on_search + self.selects = [] + self.logged_out = False + + def select(self, mailbox, readonly=False): + self.selects.append(mailbox) + return ("OK", [b""]) + + def uid(self, cmd, *args): + if cmd == "SEARCH": + if self._raise: + raise OSError("timed out") + return self._sr + return ("OK", [None]) + + def logout(self): + self.logged_out = True + + +def test_fallback_success_keeps_conn_and_returns_latest_uids(): + conn = _FakeConn(search_result=("OK", [b"1 2 3 4 5 6 7 8 9 10 11 12"])) + fresh = _FakeConn(name="fresh") + uids, out = ep._latest_inbox_fallback_uids(conn, lambda: fresh) + assert out is conn # no reconnect on success + assert not conn.logged_out + assert uids and all(f == "INBOX" for f, _ in uids) + assert len(uids) <= 8 # keeps only the latest few + + +def test_fallback_reconnects_on_poisoned_socket(): + conn = _FakeConn(raise_on_search=True) + fresh = _FakeConn(name="fresh") + calls = [] + + def reconnect(): + calls.append(1) + return fresh + + uids, out = ep._latest_inbox_fallback_uids(conn, reconnect) + assert uids == [] # failed scan yields nothing + assert out is fresh # downstream uses a FRESH connection + assert out is not conn # not the poisoned one + assert calls == [1] # reconnected exactly once + assert conn.logged_out # poisoned conn was closed + + +def test_fallback_empty_search_returns_no_uids_same_conn(): + conn = _FakeConn(search_result=("OK", [b""])) + uids, out = ep._latest_inbox_fallback_uids(conn, lambda: _FakeConn(name="fresh")) + assert uids == [] + assert out is conn