diff --git a/routes/email_routes.py b/routes/email_routes.py index 6ec887e..a9cc1ed 100644 --- a/routes/email_routes.py +++ b/routes/email_routes.py @@ -452,9 +452,12 @@ def setup_email_routes(): _LIST_CACHE = {} # key → (expires_at, response_dict) _LIST_TTL = 8.0 _READ_CACHE = {} # key → (expires_at, response_dict) - _READ_TTL = 60.0 + _READ_TTL = 30 * 60.0 _IMAP_POOL = {} # account_id → (conn, last_used_at) _IMAP_IDLE_MAX = 60.0 + _WARMING_READS = set() + _WARM_READ_LIMIT = 24 + _WARM_RECENT_SECONDS = 7 * 24 * 60 * 60 _pool_lock = _threading.Lock() def _pooled_connect(account_id, owner=""): @@ -949,6 +952,7 @@ def setup_email_routes(): if not cache_bust: cached = _list_cache_get(ck) if cached is not None: + _schedule_recent_email_warm(cached.get("emails") or [], folder, account_id, owner) return cached result = await _asyncio.to_thread( _list_emails_sync, folder, limit, offset, filter, account_id, from_addr, @@ -957,6 +961,7 @@ def setup_email_routes(): if result and not result.get("error"): if offset == 0 and not from_addr and not has_attachments and filter in ("all", "unread", "unanswered", "undone"): _record_email_received_events(owner, account_id, folder, result.get("emails") or []) + _schedule_recent_email_warm(result.get("emails") or [], folder, account_id, owner) _list_cache_put(ck, result) return result @@ -1117,7 +1122,7 @@ def setup_email_routes(): logger.error(f"Search failed: {e}") return {"emails": [], "total": 0, "error": "Mail operation failed"} - def _read_email_sync(uid, folder, account_id, owner): + def _read_email_sync(uid, folder, account_id, owner, mark_seen=True): """Sync IMAP read — wrapped in to_thread by the async handler. Two-phase: read body in readonly to avoid races with concurrent reads @@ -1156,14 +1161,15 @@ def setup_email_routes(): parsed_date = email.utils.parsedate_to_datetime(date_str) if date_str else None attachments = _list_attachments_from_msg(msg) - # Set \Seen in a separate readwrite session so concurrent reads - # of the same UID don't fight over a shared SELECT state. - try: - with _imap(account_id, owner=owner) as conn2: - conn2.select(_q(folder)) - conn2.uid("STORE", _uid_bytes(uid), "+FLAGS", "\\Seen") - except Exception: - pass + if mark_seen: + # Set \Seen in a separate readwrite session so concurrent reads + # of the same UID don't fight over a shared SELECT state. + try: + with _imap(account_id, owner=owner) as conn2: + conn2.select(_q(folder)) + conn2.uid("STORE", _uid_bytes(uid), "+FLAGS", "\\Seen") + except Exception: + pass _t_total = _t.monotonic() - _t0 if _t_total > 2.0: logger.warning( @@ -1269,18 +1275,75 @@ def setup_email_routes(): logger.error(f"Failed to read email {uid}: {e}") return {"error": "Mail operation failed"} + def _mark_email_seen_sync(uid, folder, account_id, owner): + try: + with _imap(account_id, owner=owner) as conn: + conn.select(_q(folder)) + conn.uid("STORE", _uid_bytes(uid), "+FLAGS", "\\Seen") + _invalidate_list_cache(account_id, folder) + except Exception as e: + logger.debug(f"mark-seen after cached read failed uid={uid}: {e}") + @router.get("/read/{uid}") async def read_email_by_uid(uid: str, folder: str = Query("INBOX"), account_id: str | None = Query(None), owner: str = Depends(require_owner)): - """Read email body. Cached for 60s, sync IMAP work runs in a thread.""" + """Read email body. Cached for 30m, sync IMAP work runs in a thread.""" ck = _read_cache_key(account_id, folder, uid, owner=owner) cached = _read_cache_get(ck) if cached is not None: + try: + _asyncio.create_task(_asyncio.to_thread(_mark_email_seen_sync, uid, folder, account_id, owner)) + except RuntimeError: + pass return cached result = await _asyncio.to_thread(_read_email_sync, uid, folder, account_id, owner) if result and not result.get("error"): _read_cache_put(ck, result) return result + def _schedule_recent_email_warm(emails: list, folder: str, account_id: str | None, owner: str): + if not emails or folder == "__scheduled__": + return + now = _time.time() + selected = [] + for em in emails: + uid = str((em or {}).get("uid") or "").strip() + if not uid: + continue + try: + epoch = float((em or {}).get("date_epoch") or 0) + except Exception: + epoch = 0 + if epoch and now - epoch > _WARM_RECENT_SECONDS: + continue + ck = _read_cache_key(account_id, folder, uid, owner=owner) + if _read_cache_get(ck) is not None or ck in _WARMING_READS: + continue + selected.append((uid, ck)) + if len(selected) >= _WARM_READ_LIMIT: + break + if not selected: + return + + async def _warm(): + for uid, ck in selected: + if _read_cache_get(ck) is not None: + continue + _WARMING_READS.add(ck) + try: + result = await _asyncio.to_thread(_read_email_sync, uid, folder, account_id, owner, False) + if result and not result.get("error"): + _read_cache_put(ck, result) + except Exception as e: + logger.debug(f"email read warm skipped uid={uid}: {e}") + finally: + _WARMING_READS.discard(ck) + await _asyncio.sleep(0.05) + + try: + _asyncio.create_task(_warm()) + except RuntimeError: + pass + @router.get("/attachments/{uid}") async def list_attachments(uid: str, folder: str = Query("INBOX"), account_id: str | None = Query(None), owner: str = Depends(require_owner)): """List attachments for an email."""