Warm recent email read cache
This commit is contained in:
@@ -452,9 +452,12 @@ def setup_email_routes():
|
||||
_LIST_CACHE = {} # key → (expires_at, response_dict)
|
||||
_LIST_TTL = 8.0
|
||||
_READ_CACHE = {} # key → (expires_at, response_dict)
|
||||
_READ_TTL = 60.0
|
||||
_READ_TTL = 30 * 60.0
|
||||
_IMAP_POOL = {} # account_id → (conn, last_used_at)
|
||||
_IMAP_IDLE_MAX = 60.0
|
||||
_WARMING_READS = set()
|
||||
_WARM_READ_LIMIT = 24
|
||||
_WARM_RECENT_SECONDS = 7 * 24 * 60 * 60
|
||||
_pool_lock = _threading.Lock()
|
||||
|
||||
def _pooled_connect(account_id, owner=""):
|
||||
@@ -949,6 +952,7 @@ def setup_email_routes():
|
||||
if not cache_bust:
|
||||
cached = _list_cache_get(ck)
|
||||
if cached is not None:
|
||||
_schedule_recent_email_warm(cached.get("emails") or [], folder, account_id, owner)
|
||||
return cached
|
||||
result = await _asyncio.to_thread(
|
||||
_list_emails_sync, folder, limit, offset, filter, account_id, from_addr,
|
||||
@@ -957,6 +961,7 @@ def setup_email_routes():
|
||||
if result and not result.get("error"):
|
||||
if offset == 0 and not from_addr and not has_attachments and filter in ("all", "unread", "unanswered", "undone"):
|
||||
_record_email_received_events(owner, account_id, folder, result.get("emails") or [])
|
||||
_schedule_recent_email_warm(result.get("emails") or [], folder, account_id, owner)
|
||||
_list_cache_put(ck, result)
|
||||
return result
|
||||
|
||||
@@ -1117,7 +1122,7 @@ def setup_email_routes():
|
||||
logger.error(f"Search failed: {e}")
|
||||
return {"emails": [], "total": 0, "error": "Mail operation failed"}
|
||||
|
||||
def _read_email_sync(uid, folder, account_id, owner):
|
||||
def _read_email_sync(uid, folder, account_id, owner, mark_seen=True):
|
||||
"""Sync IMAP read — wrapped in to_thread by the async handler.
|
||||
|
||||
Two-phase: read body in readonly to avoid races with concurrent reads
|
||||
@@ -1156,14 +1161,15 @@ def setup_email_routes():
|
||||
parsed_date = email.utils.parsedate_to_datetime(date_str) if date_str else None
|
||||
attachments = _list_attachments_from_msg(msg)
|
||||
|
||||
# Set \Seen in a separate readwrite session so concurrent reads
|
||||
# of the same UID don't fight over a shared SELECT state.
|
||||
try:
|
||||
with _imap(account_id, owner=owner) as conn2:
|
||||
conn2.select(_q(folder))
|
||||
conn2.uid("STORE", _uid_bytes(uid), "+FLAGS", "\\Seen")
|
||||
except Exception:
|
||||
pass
|
||||
if mark_seen:
|
||||
# Set \Seen in a separate readwrite session so concurrent reads
|
||||
# of the same UID don't fight over a shared SELECT state.
|
||||
try:
|
||||
with _imap(account_id, owner=owner) as conn2:
|
||||
conn2.select(_q(folder))
|
||||
conn2.uid("STORE", _uid_bytes(uid), "+FLAGS", "\\Seen")
|
||||
except Exception:
|
||||
pass
|
||||
_t_total = _t.monotonic() - _t0
|
||||
if _t_total > 2.0:
|
||||
logger.warning(
|
||||
@@ -1269,18 +1275,75 @@ def setup_email_routes():
|
||||
logger.error(f"Failed to read email {uid}: {e}")
|
||||
return {"error": "Mail operation failed"}
|
||||
|
||||
def _mark_email_seen_sync(uid, folder, account_id, owner):
|
||||
try:
|
||||
with _imap(account_id, owner=owner) as conn:
|
||||
conn.select(_q(folder))
|
||||
conn.uid("STORE", _uid_bytes(uid), "+FLAGS", "\\Seen")
|
||||
_invalidate_list_cache(account_id, folder)
|
||||
except Exception as e:
|
||||
logger.debug(f"mark-seen after cached read failed uid={uid}: {e}")
|
||||
|
||||
@router.get("/read/{uid}")
|
||||
async def read_email_by_uid(uid: str, folder: str = Query("INBOX"), account_id: str | None = Query(None), owner: str = Depends(require_owner)):
|
||||
"""Read email body. Cached for 60s, sync IMAP work runs in a thread."""
|
||||
"""Read email body. Cached for 30m, sync IMAP work runs in a thread."""
|
||||
ck = _read_cache_key(account_id, folder, uid, owner=owner)
|
||||
cached = _read_cache_get(ck)
|
||||
if cached is not None:
|
||||
try:
|
||||
_asyncio.create_task(_asyncio.to_thread(_mark_email_seen_sync, uid, folder, account_id, owner))
|
||||
except RuntimeError:
|
||||
pass
|
||||
return cached
|
||||
result = await _asyncio.to_thread(_read_email_sync, uid, folder, account_id, owner)
|
||||
if result and not result.get("error"):
|
||||
_read_cache_put(ck, result)
|
||||
return result
|
||||
|
||||
def _schedule_recent_email_warm(emails: list, folder: str, account_id: str | None, owner: str):
|
||||
if not emails or folder == "__scheduled__":
|
||||
return
|
||||
now = _time.time()
|
||||
selected = []
|
||||
for em in emails:
|
||||
uid = str((em or {}).get("uid") or "").strip()
|
||||
if not uid:
|
||||
continue
|
||||
try:
|
||||
epoch = float((em or {}).get("date_epoch") or 0)
|
||||
except Exception:
|
||||
epoch = 0
|
||||
if epoch and now - epoch > _WARM_RECENT_SECONDS:
|
||||
continue
|
||||
ck = _read_cache_key(account_id, folder, uid, owner=owner)
|
||||
if _read_cache_get(ck) is not None or ck in _WARMING_READS:
|
||||
continue
|
||||
selected.append((uid, ck))
|
||||
if len(selected) >= _WARM_READ_LIMIT:
|
||||
break
|
||||
if not selected:
|
||||
return
|
||||
|
||||
async def _warm():
|
||||
for uid, ck in selected:
|
||||
if _read_cache_get(ck) is not None:
|
||||
continue
|
||||
_WARMING_READS.add(ck)
|
||||
try:
|
||||
result = await _asyncio.to_thread(_read_email_sync, uid, folder, account_id, owner, False)
|
||||
if result and not result.get("error"):
|
||||
_read_cache_put(ck, result)
|
||||
except Exception as e:
|
||||
logger.debug(f"email read warm skipped uid={uid}: {e}")
|
||||
finally:
|
||||
_WARMING_READS.discard(ck)
|
||||
await _asyncio.sleep(0.05)
|
||||
|
||||
try:
|
||||
_asyncio.create_task(_warm())
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
@router.get("/attachments/{uid}")
|
||||
async def list_attachments(uid: str, folder: str = Query("INBOX"), account_id: str | None = Query(None), owner: str = Depends(require_owner)):
|
||||
"""List attachments for an email."""
|
||||
|
||||
Reference in New Issue
Block a user