Use shared IMAP timeout for account tests (#1088)

This commit is contained in:
red person
2026-06-02 17:11:04 +03:00
committed by GitHub
parent 21b6f9344e
commit c7ddfd7dd2
3 changed files with 165 additions and 20 deletions

View File

@@ -608,7 +608,32 @@ def _list_email_accounts() -> list[dict]:
# ── IMAP helpers ── # ── IMAP helpers ──
_IMAP_TIMEOUT_SECONDS = 15 def _coerce_imap_timeout_seconds(raw: str | None) -> int:
try:
value = int(raw or "30")
except (TypeError, ValueError):
value = 30
return max(5, min(value, 300))
_IMAP_TIMEOUT_SECONDS = _coerce_imap_timeout_seconds(os.environ.get("ODYSSEUS_IMAP_TIMEOUT_SECONDS"))
def _open_imap_connection(host: str, port: int, *, starttls: bool, timeout: int = _IMAP_TIMEOUT_SECONDS):
"""Open an IMAP connection using the configured security mode."""
port = int(port or 993)
if starttls:
conn = imaplib.IMAP4(host, port, timeout=timeout)
conn.starttls()
elif port == 993:
conn = imaplib.IMAP4_SSL(host, port, timeout=timeout)
else:
conn = imaplib.IMAP4(host, port, timeout=timeout)
try:
conn.sock.settimeout(timeout)
except Exception:
pass
return conn
def _imap_connect(account_id: str | None = None, owner: str = ""): def _imap_connect(account_id: str | None = None, owner: str = ""):
# SECURITY: passing `owner` scopes the fallback config lookup so a brand # SECURITY: passing `owner` scopes the fallback config lookup so a brand
@@ -622,17 +647,12 @@ def _imap_connect(account_id: str | None = None, owner: str = ""):
# The last branch is critical: previously this fell into IMAP4_SSL # The last branch is critical: previously this fell into IMAP4_SSL
# for any non-STARTTLS port, which would fail the TLS handshake on # for any non-STARTTLS port, which would fail the TLS handshake on
# plain local servers (Dovecot on 31143, etc.). # plain local servers (Dovecot on 31143, etc.).
if cfg.get("imap_starttls"): conn = _open_imap_connection(
conn = imaplib.IMAP4(cfg["imap_host"], cfg["imap_port"], timeout=_IMAP_TIMEOUT_SECONDS) cfg["imap_host"],
conn.starttls() cfg["imap_port"],
elif int(cfg.get("imap_port") or 993) == 993: starttls=bool(cfg.get("imap_starttls")),
conn = imaplib.IMAP4_SSL(cfg["imap_host"], cfg["imap_port"], timeout=_IMAP_TIMEOUT_SECONDS) timeout=_IMAP_TIMEOUT_SECONDS,
else: )
conn = imaplib.IMAP4(cfg["imap_host"], cfg["imap_port"], timeout=_IMAP_TIMEOUT_SECONDS)
try:
conn.sock.settimeout(_IMAP_TIMEOUT_SECONDS)
except Exception:
pass
conn.login(cfg["imap_user"], cfg["imap_password"]) conn.login(cfg["imap_user"], cfg["imap_password"])
return conn return conn

View File

@@ -17,7 +17,6 @@ import sqlite3 as _sql3
import email as email_mod import email as email_mod
import email.header import email.header
import email.utils import email.utils
import imaplib
import smtplib import smtplib
import json import json
import re import re
@@ -41,6 +40,7 @@ from routes.email_helpers import (
_q, _attach_compose_uploads, _cleanup_compose_uploads, _q, _attach_compose_uploads, _cleanup_compose_uploads,
_load_settings, _save_settings, _get_email_config, _load_settings, _save_settings, _get_email_config,
_send_smtp_message, _smtp_security_mode, _send_smtp_message, _smtp_security_mode,
_IMAP_TIMEOUT_SECONDS, _open_imap_connection,
_imap_connect, _imap, _decode_header, _detect_sent_folder, _detect_drafts_folder, _imap_connect, _imap, _decode_header, _detect_sent_folder, _detect_drafts_folder,
_extract_attachment_text, _list_attachments_from_msg, _extract_attachment_text, _list_attachments_from_msg,
_extract_attachment_to_disk, _extract_html, _extract_text, _extract_attachment_to_disk, _extract_html, _extract_text,
@@ -3103,13 +3103,12 @@ def setup_email_routes():
# port (Dovecot on 31143, etc.) would always fail the SSL # port (Dovecot on 31143, etc.) would always fail the SSL
# handshake because they're not actually wrapped in TLS. # handshake because they're not actually wrapped in TLS.
try: try:
if imap_starttls: conn = _open_imap_connection(
conn = imaplib.IMAP4(imap_host, imap_port, timeout=10) imap_host,
conn.starttls() imap_port,
elif imap_port == 993: starttls=imap_starttls,
conn = imaplib.IMAP4_SSL(imap_host, imap_port, timeout=10) timeout=_IMAP_TIMEOUT_SECONDS,
else: )
conn = imaplib.IMAP4(imap_host, imap_port, timeout=10)
try: try:
conn.login(imap_user, imap_pass) conn.login(imap_user, imap_pass)
imap_result = {"ok": True} imap_result = {"ok": True}

View File

@@ -0,0 +1,126 @@
import os
import tempfile
from pathlib import Path
import pytest
_tmp_data = Path(tempfile.mkdtemp(prefix="odysseus-email-imap-test-"))
os.environ.setdefault("DATA_DIR", str(_tmp_data))
os.environ.setdefault("DATABASE_URL", f"sqlite:///{_tmp_data / 'app.db'}")
from routes.email_helpers import (
_IMAP_TIMEOUT_SECONDS,
_coerce_imap_timeout_seconds,
_open_imap_connection,
)
class _FakeSock:
def __init__(self):
self.timeout = None
def settimeout(self, timeout):
self.timeout = timeout
class _FakeIMAP:
calls = []
def __init__(self, host, port, timeout=None):
self.host = host
self.port = port
self.timeout = timeout
self.sock = _FakeSock()
self.starttls_called = False
_FakeIMAP.calls.append(("connect", self.__class__.__name__, host, port, timeout))
def starttls(self):
self.starttls_called = True
_FakeIMAP.calls.append(("starttls", self.host, self.port))
def login(self, user, password):
_FakeIMAP.calls.append(("login", user, password))
def logout(self):
_FakeIMAP.calls.append(("logout", self.host, self.port))
class _FakeIMAPSSL(_FakeIMAP):
pass
def test_imap_timeout_defaults_and_clamps():
assert _coerce_imap_timeout_seconds(None) == 30
assert _coerce_imap_timeout_seconds("nonsense") == 30
assert _coerce_imap_timeout_seconds("2") == 5
assert _coerce_imap_timeout_seconds("999") == 300
def test_open_imap_connection_uses_shared_timeout_for_implicit_ssl(monkeypatch):
import routes.email_helpers as helpers
_FakeIMAP.calls = []
monkeypatch.setattr(helpers.imaplib, "IMAP4", _FakeIMAP)
monkeypatch.setattr(helpers.imaplib, "IMAP4_SSL", _FakeIMAPSSL)
conn = _open_imap_connection("imap.one.com", 993, starttls=False)
assert _FakeIMAP.calls == [
("connect", "_FakeIMAPSSL", "imap.one.com", 993, _IMAP_TIMEOUT_SECONDS)
]
assert conn.sock.timeout == _IMAP_TIMEOUT_SECONDS
def test_open_imap_connection_supports_starttls(monkeypatch):
import routes.email_helpers as helpers
_FakeIMAP.calls = []
monkeypatch.setattr(helpers.imaplib, "IMAP4", _FakeIMAP)
monkeypatch.setattr(helpers.imaplib, "IMAP4_SSL", _FakeIMAPSSL)
_open_imap_connection("imap.local", 143, starttls=True)
assert _FakeIMAP.calls == [
("connect", "_FakeIMAP", "imap.local", 143, _IMAP_TIMEOUT_SECONDS),
("starttls", "imap.local", 143),
]
@pytest.mark.asyncio
async def test_account_config_uses_shared_imap_timeout(monkeypatch):
import routes.email_routes as email_routes
captured = {}
class _Conn:
def login(self, user, password):
captured["login"] = (user, password)
def logout(self):
captured["logout"] = True
def fake_open(host, port, *, starttls, timeout):
captured["open"] = (host, port, starttls, timeout)
return _Conn()
class _Req:
async def json(self):
return {
"imap_host": "imap.one.com",
"imap_port": 993,
"imap_user": "user@example.com",
"imap_password": "pw",
"imap_starttls": False,
}
monkeypatch.setattr(email_routes, "_open_imap_connection", fake_open)
router = email_routes.setup_email_routes()
endpoint = next(route.endpoint for route in router.routes if route.path == "/api/email/accounts/test")
result = await endpoint(_Req(), owner="")
assert result["imap"] == {"ok": True}
assert captured["open"] == ("imap.one.com", 993, False, _IMAP_TIMEOUT_SECONDS)
assert captured["login"] == ("user@example.com", "pw")
assert captured["logout"] is True