On a large Gmail mailbox the email-summary poller's SINCE scan often finds nothing (INTERNALDATE/date-header quirks), so it falls back to SEARCH ALL. That returns one enormous UID line; the socket read can time out mid-response, and the exception was swallowed — leaving the unread '* SEARCH 325188 …' bytes on the socket. The next command (the downstream re-select) then read those leftover bytes and failed with 'EXAMINE => unexpected response: b'325188 …''. Extract the fallback into _latest_inbox_fallback_uids(conn, reconnect): on a failed SEARCH ALL it logs out the poisoned connection and reconnects, returning the fresh connection for downstream use. Reconnecting is correct by construction — a new connection cannot carry the old one's leftover bytes — so the re-select always runs on a clean socket. The same SEARCH ALL + reuse pattern also exists in mcp_servers/email_server.py and routes/email_routes.py; left for a separate change to keep this surgical. Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -99,6 +99,36 @@ async def _run_auto_summarize_once(do_summary: bool = True, do_reply: bool = Tru
|
|||||||
_save_settings(s2)
|
_save_settings(s2)
|
||||||
|
|
||||||
|
|
||||||
|
def _latest_inbox_fallback_uids(conn, reconnect):
|
||||||
|
"""Latest INBOX UIDs via ``SEARCH ALL``, with a poisoned-socket guard (#1613).
|
||||||
|
|
||||||
|
On a large Gmail mailbox the fallback ``SEARCH ALL`` can time out mid-reply,
|
||||||
|
leaving its enormous ``* SEARCH <uids…>`` line unread on the socket. The next
|
||||||
|
command (the downstream re-select / EXAMINE) then reads those leftover bytes
|
||||||
|
and fails with ``EXAMINE => unexpected response: b'325188 …'``. Reconnecting
|
||||||
|
on failure guarantees the downstream command starts from a clean socket.
|
||||||
|
|
||||||
|
Returns ``(uids, conn)`` — ``conn`` is the live connection to keep using: the
|
||||||
|
same one on success, a fresh one (via ``reconnect()``) if we had to recover.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
conn.select("INBOX", readonly=True)
|
||||||
|
status, data = conn.uid("SEARCH", None, "ALL")
|
||||||
|
uids = []
|
||||||
|
if status == "OK" and data and data[0]:
|
||||||
|
for u in reversed(data[0].split()[-8:]):
|
||||||
|
uids.append(("INBOX", u))
|
||||||
|
logger.info("Email task SINCE scan found no messages; fell back to latest INBOX messages")
|
||||||
|
return uids, conn
|
||||||
|
except Exception as _e:
|
||||||
|
logger.warning(f"Latest-INBOX fallback scan failed: {_e}")
|
||||||
|
try:
|
||||||
|
conn.logout()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return [], reconnect()
|
||||||
|
|
||||||
|
|
||||||
async def _auto_summarize_pass(days_back: int = 1, account_id: str | None = None, progress_cb=None) -> str:
|
async def _auto_summarize_pass(days_back: int = 1, account_id: str | None = None, progress_cb=None) -> str:
|
||||||
"""Single pass of the auto-summarize/reply scan.
|
"""Single pass of the auto-summarize/reply scan.
|
||||||
|
|
||||||
@@ -201,16 +231,12 @@ async def _auto_summarize_pass_single(days_back: int = 1, account_id: str | None
|
|||||||
# the latest visible inbox messages so Clear cache -> Run again can
|
# the latest visible inbox messages so Clear cache -> Run again can
|
||||||
# actually repopulate AI reply/summary/tag caches.
|
# actually repopulate AI reply/summary/tag caches.
|
||||||
if not uid_list:
|
if not uid_list:
|
||||||
try:
|
_fb_uids, conn = _latest_inbox_fallback_uids(
|
||||||
conn.select("INBOX", readonly=True)
|
conn, lambda: _imap_connect(account_id, owner=account_owner)
|
||||||
status, data = conn.uid("SEARCH", None, "ALL")
|
)
|
||||||
if status == "OK" and data and data[0]:
|
uid_list.extend(_fb_uids)
|
||||||
for u in reversed(data[0].split()[-8:]):
|
# Re-select INBOX as default for downstream code (on a clean socket even
|
||||||
uid_list.append(("INBOX", u))
|
# if the SEARCH ALL fallback above failed — see #1613).
|
||||||
logger.info("Email task SINCE scan found no messages; fell back to latest INBOX messages")
|
|
||||||
except Exception as _e:
|
|
||||||
logger.warning(f"Latest-INBOX fallback scan failed: {_e}")
|
|
||||||
# Re-select INBOX as default for downstream code
|
|
||||||
conn.select("INBOX", readonly=True)
|
conn.select("INBOX", readonly=True)
|
||||||
if not uid_list:
|
if not uid_list:
|
||||||
return "No recent emails"
|
return "No recent emails"
|
||||||
|
|||||||
69
tests/test_email_fallback_reconnect.py
Normal file
69
tests/test_email_fallback_reconnect.py
Normal file
@@ -0,0 +1,69 @@
|
|||||||
|
"""Regression for issue #1613 — on a large Gmail mailbox the email-summary
|
||||||
|
poller's `SEARCH ALL` fallback can time out mid-response, leaving its huge
|
||||||
|
`* SEARCH <uids…>` line unread on the socket. The next command (the downstream
|
||||||
|
re-select / EXAMINE) then reads those leftover bytes and fails with
|
||||||
|
`EXAMINE => unexpected response: b'325188 …'`.
|
||||||
|
|
||||||
|
`_latest_inbox_fallback_uids` reconnects on a failed SEARCH ALL so the downstream
|
||||||
|
command always runs on a clean socket. Tested with a fake IMAP connection — no
|
||||||
|
live server needed; reconnecting is correct by construction (a fresh connection
|
||||||
|
cannot carry the old one's leftover bytes).
|
||||||
|
"""
|
||||||
|
from routes import email_pollers as ep
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeConn:
|
||||||
|
def __init__(self, search_result=None, raise_on_search=False, name="orig"):
|
||||||
|
self.name = name
|
||||||
|
self._sr = search_result
|
||||||
|
self._raise = raise_on_search
|
||||||
|
self.selects = []
|
||||||
|
self.logged_out = False
|
||||||
|
|
||||||
|
def select(self, mailbox, readonly=False):
|
||||||
|
self.selects.append(mailbox)
|
||||||
|
return ("OK", [b""])
|
||||||
|
|
||||||
|
def uid(self, cmd, *args):
|
||||||
|
if cmd == "SEARCH":
|
||||||
|
if self._raise:
|
||||||
|
raise OSError("timed out")
|
||||||
|
return self._sr
|
||||||
|
return ("OK", [None])
|
||||||
|
|
||||||
|
def logout(self):
|
||||||
|
self.logged_out = True
|
||||||
|
|
||||||
|
|
||||||
|
def test_fallback_success_keeps_conn_and_returns_latest_uids():
|
||||||
|
conn = _FakeConn(search_result=("OK", [b"1 2 3 4 5 6 7 8 9 10 11 12"]))
|
||||||
|
fresh = _FakeConn(name="fresh")
|
||||||
|
uids, out = ep._latest_inbox_fallback_uids(conn, lambda: fresh)
|
||||||
|
assert out is conn # no reconnect on success
|
||||||
|
assert not conn.logged_out
|
||||||
|
assert uids and all(f == "INBOX" for f, _ in uids)
|
||||||
|
assert len(uids) <= 8 # keeps only the latest few
|
||||||
|
|
||||||
|
|
||||||
|
def test_fallback_reconnects_on_poisoned_socket():
|
||||||
|
conn = _FakeConn(raise_on_search=True)
|
||||||
|
fresh = _FakeConn(name="fresh")
|
||||||
|
calls = []
|
||||||
|
|
||||||
|
def reconnect():
|
||||||
|
calls.append(1)
|
||||||
|
return fresh
|
||||||
|
|
||||||
|
uids, out = ep._latest_inbox_fallback_uids(conn, reconnect)
|
||||||
|
assert uids == [] # failed scan yields nothing
|
||||||
|
assert out is fresh # downstream uses a FRESH connection
|
||||||
|
assert out is not conn # not the poisoned one
|
||||||
|
assert calls == [1] # reconnected exactly once
|
||||||
|
assert conn.logged_out # poisoned conn was closed
|
||||||
|
|
||||||
|
|
||||||
|
def test_fallback_empty_search_returns_no_uids_same_conn():
|
||||||
|
conn = _FakeConn(search_result=("OK", [b""]))
|
||||||
|
uids, out = ep._latest_inbox_fallback_uids(conn, lambda: _FakeConn(name="fresh"))
|
||||||
|
assert uids == []
|
||||||
|
assert out is conn
|
||||||
Reference in New Issue
Block a user