fix(email): scope AI caches by owner (#2695)

This commit is contained in:
anduimagui
2026-06-05 01:21:50 +01:00
committed by GitHub
parent 23fb5e169a
commit f9c81f3c8d
5 changed files with 247 additions and 49 deletions

View File

@@ -266,6 +266,48 @@ COMPOSE_UPLOADS_DIR.mkdir(parents=True, exist_ok=True)
SCHEDULED_DB = DATA_DIR / "scheduled_emails.db"
OWNER_SCOPED_EMAIL_CACHE_TABLES = {
"email_summaries",
"email_ai_replies",
"email_calendar_extractions",
"email_urgency_alerts",
}
def _email_cache_owner_clause(owner: str = "") -> tuple[str, tuple[str, ...]]:
owner = (owner or "").strip()
if owner:
return "owner = ?", (owner,)
return "(owner = '' OR owner IS NULL)", ()
def _ensure_owner_scoped_email_cache_table(conn, table: str, create_sql: str, columns: list[str]):
"""Rebuild legacy Message-ID-only cache tables with owner in the PK."""
conn.execute(create_sql)
try:
info = conn.execute(f"PRAGMA table_info({table})").fetchall()
cols = [r[1] for r in info]
pk_cols = [r[1] for r in sorted((r for r in info if r[5]), key=lambda r: r[5])]
if "owner" in cols and pk_cols == ["message_id", "owner"]:
return
conn.execute(f"ALTER TABLE {table} RENAME TO {table}__old")
conn.execute(create_sql)
old_cols = [r[1] for r in conn.execute(f"PRAGMA table_info({table}__old)").fetchall()]
copy_cols = [c for c in columns if c != "owner" and c in old_cols]
source_owner = "COALESCE(owner, '')" if "owner" in old_cols else "''"
target_cols = ["owner", *copy_cols]
select_exprs = [source_owner, *copy_cols]
conn.execute(
f"INSERT OR IGNORE INTO {table} ({', '.join(target_cols)}) "
f"SELECT {', '.join(select_exprs)} FROM {table}__old"
)
conn.execute(f"DROP TABLE {table}__old")
except Exception as _mig_e:
import logging as _lg
_lg.getLogger(__name__).warning(f"{table} owner-migration skipped: {_mig_e}")
def attachment_extract_dir(folder: str, uid: str) -> Path:
"""Containment-safe extraction directory for an attachment.
@@ -301,30 +343,35 @@ def _init_scheduled_db():
owner TEXT DEFAULT ''
)
""")
# Email summary cache (keyed by Message-ID)
conn.execute("""
# Email summary cache. SECURITY: Message-IDs are global, so AI-derived
# cache rows must be owner-scoped just like email_tags.
_ensure_owner_scoped_email_cache_table(conn, "email_summaries", """
CREATE TABLE IF NOT EXISTS email_summaries (
message_id TEXT PRIMARY KEY,
message_id TEXT,
owner TEXT DEFAULT '',
uid TEXT,
folder TEXT,
subject TEXT,
sender TEXT,
summary TEXT NOT NULL,
model_used TEXT,
created_at TEXT NOT NULL
created_at TEXT NOT NULL,
PRIMARY KEY (message_id, owner)
)
""")
""", ["message_id", "owner", "uid", "folder", "subject", "sender", "summary", "model_used", "created_at"])
# Email AI reply cache (pre-generated draft replies)
conn.execute("""
_ensure_owner_scoped_email_cache_table(conn, "email_ai_replies", """
CREATE TABLE IF NOT EXISTS email_ai_replies (
message_id TEXT PRIMARY KEY,
message_id TEXT,
owner TEXT DEFAULT '',
uid TEXT,
folder TEXT,
reply TEXT NOT NULL,
model_used TEXT,
created_at TEXT NOT NULL
created_at TEXT NOT NULL,
PRIMARY KEY (message_id, owner)
)
""")
""", ["message_id", "owner", "uid", "folder", "reply", "model_used", "created_at"])
# Email tags / spam classification cache. SECURITY: keyed by
# (message_id, owner) because Message-IDs are GLOBAL (a newsletter goes
# to many users with the same Message-ID). Without owner-scoping, a
@@ -384,17 +431,20 @@ def _init_scheduled_db():
# Best-effort — log via the module logger if available
import logging as _lg
_lg.getLogger(__name__).warning(f"email_tags owner-migration skipped: {_mig_e}")
conn.execute("""
_ensure_owner_scoped_email_cache_table(conn, "email_calendar_extractions", """
CREATE TABLE IF NOT EXISTS email_calendar_extractions (
message_id TEXT PRIMARY KEY,
message_id TEXT,
owner TEXT DEFAULT '',
uid TEXT,
events_created INTEGER DEFAULT 0,
created_at TEXT NOT NULL
created_at TEXT NOT NULL,
PRIMARY KEY (message_id, owner)
)
""")
conn.execute("""
""", ["message_id", "owner", "uid", "events_created", "created_at"])
_ensure_owner_scoped_email_cache_table(conn, "email_urgency_alerts", """
CREATE TABLE IF NOT EXISTS email_urgency_alerts (
message_id TEXT PRIMARY KEY,
message_id TEXT,
owner TEXT DEFAULT '',
uid TEXT,
folder TEXT,
subject TEXT,
@@ -402,9 +452,10 @@ def _init_scheduled_db():
urgency TEXT,
reason TEXT,
alerted INTEGER DEFAULT 0,
created_at TEXT NOT NULL
created_at TEXT NOT NULL,
PRIMARY KEY (message_id, owner)
)
""")
""", ["message_id", "owner", "uid", "folder", "subject", "sender", "urgency", "reason", "alerted", "created_at"])
conn.execute("""
CREATE TABLE IF NOT EXISTS email_event_seen (
owner TEXT NOT NULL,

View File

@@ -39,7 +39,7 @@ from routes.email_helpers import (
_extract_attachment_text, _extract_text,
_pre_retrieve_context,
_attach_compose_uploads, _cleanup_compose_uploads, _q,
SCHEDULED_DB, _EMAIL_REPLY_SYS_PROMPT_BASE,
SCHEDULED_DB, _EMAIL_REPLY_SYS_PROMPT_BASE, _email_cache_owner_clause,
)
logger = logging.getLogger(__name__)
@@ -243,8 +243,15 @@ async def _auto_summarize_pass_single(days_back: int = 1, account_id: str | None
await _emit_progress(progress_cb, f"Found {len(uid_list)} recent email(s); checking cache…")
_c = _sql3.connect(SCHEDULED_DB)
_sum_existing = {r[0] for r in _c.execute("SELECT message_id FROM email_summaries").fetchall()}
_reply_existing = {r[0] for r in _c.execute("SELECT message_id FROM email_ai_replies").fetchall()}
_cache_owner_clause, _cache_owner_params = _email_cache_owner_clause(account_owner)
_sum_existing = {r[0] for r in _c.execute(
f"SELECT message_id FROM email_summaries WHERE {_cache_owner_clause}",
_cache_owner_params,
).fetchall()}
_reply_existing = {r[0] for r in _c.execute(
f"SELECT message_id FROM email_ai_replies WHERE {_cache_owner_clause}",
_cache_owner_params,
).fetchall()}
if auto_tag or auto_spam:
if account_owner:
_tag_existing = {r[0] for r in _c.execute("SELECT message_id FROM email_tags WHERE owner=?", (account_owner,)).fetchall()}
@@ -252,12 +259,18 @@ async def _auto_summarize_pass_single(days_back: int = 1, account_id: str | None
_tag_existing = {r[0] for r in _c.execute("SELECT message_id FROM email_tags WHERE owner='' OR owner IS NULL").fetchall()}
else:
_tag_existing = set()
_cal_existing = {r[0] for r in _c.execute("SELECT message_id FROM email_calendar_extractions").fetchall()} if auto_cal else set()
_cal_existing = {r[0] for r in _c.execute(
f"SELECT message_id FROM email_calendar_extractions WHERE {_cache_owner_clause}",
_cache_owner_params,
).fetchall()} if auto_cal else set()
# Urgency is handled by the built-in `check_email_urgency` task. Keep
# this legacy poller path disabled so users don't get two independent
# urgent-email systems.
auto_urgent = False
_urgent_existing = {r[0] for r in _c.execute("SELECT message_id FROM email_urgency_alerts").fetchall()} if auto_urgent else set()
_urgent_existing = {r[0] for r in _c.execute(
f"SELECT message_id FROM email_urgency_alerts WHERE {_cache_owner_clause}",
_cache_owner_params,
).fetchall()} if auto_urgent else set()
_c.close()
# Hoist the self-address lookup OUT of the per-email loop — fetching
@@ -415,9 +428,9 @@ async def _auto_summarize_pass_single(days_back: int = 1, account_id: str | None
_c = _sql3.connect(SCHEDULED_DB)
_c.execute("""
INSERT OR REPLACE INTO email_summaries
(message_id, uid, folder, subject, sender, summary, model_used, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (message_id, uid.decode() if isinstance(uid, bytes) else str(uid), _folder, subject, sender, summary, model, datetime.utcnow().isoformat()))
(message_id, owner, uid, folder, subject, sender, summary, model_used, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (message_id, account_owner or "", uid.decode() if isinstance(uid, bytes) else str(uid), _folder, subject, sender, summary, model, datetime.utcnow().isoformat()))
_c.commit()
_c.close()
_sum_existing.add(message_id)
@@ -458,9 +471,9 @@ async def _auto_summarize_pass_single(days_back: int = 1, account_id: str | None
_c = _sql3.connect(SCHEDULED_DB)
_c.execute("""
INSERT OR REPLACE INTO email_ai_replies
(message_id, uid, folder, reply, model_used, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""", (message_id, uid.decode() if isinstance(uid, bytes) else str(uid), _folder, reply, model, datetime.utcnow().isoformat()))
(message_id, owner, uid, folder, reply, model_used, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (message_id, account_owner or "", uid.decode() if isinstance(uid, bytes) else str(uid), _folder, reply, model, datetime.utcnow().isoformat()))
_c.commit()
_c.close()
_reply_existing.add(message_id)
@@ -675,8 +688,8 @@ async def _auto_summarize_pass_single(days_back: int = 1, account_id: str | None
_cc = _sql3.connect(SCHEDULED_DB)
_cc.execute(
"INSERT OR REPLACE INTO email_calendar_extractions "
"(message_id, uid, events_created, created_at) VALUES (?, ?, ?, ?)",
(message_id, uid.decode() if isinstance(uid, bytes) else str(uid),
"(message_id, owner, uid, events_created, created_at) VALUES (?, ?, ?, ?, ?)",
(message_id, account_owner or "", uid.decode() if isinstance(uid, bytes) else str(uid),
_cal_run_count, datetime.utcnow().isoformat())
)
_cc.commit()
@@ -733,9 +746,9 @@ async def _auto_summarize_pass_single(days_back: int = 1, account_id: str | None
_uc = _sql3.connect(SCHEDULED_DB)
_uc.execute(
"INSERT OR REPLACE INTO email_urgency_alerts "
"(message_id, uid, folder, subject, sender, urgency, reason, alerted, created_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(message_id, uid.decode() if isinstance(uid, bytes) else str(uid),
"(message_id, owner, uid, folder, subject, sender, urgency, reason, alerted, created_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(message_id, account_owner or "", uid.decode() if isinstance(uid, bytes) else str(uid),
_folder, subject, sender, urgency, reason,
1 if urgency in ("critical", "high") else 0,
datetime.utcnow().isoformat())

View File

@@ -49,7 +49,7 @@ from routes.email_helpers import (
_EMAIL_REPLY_SYS_PROMPT_BASE, _POOL_HOOKS,
SendEmailRequest, ExtractStyleRequest,
ATTACHMENTS_DIR, COMPOSE_UPLOADS_DIR, SCHEDULED_DB,
attachment_extract_dir,
attachment_extract_dir, _email_cache_owner_clause,
)
from routes.email_pollers import _start_poller
@@ -934,9 +934,11 @@ def setup_email_routes():
import sqlite3 as _sql3
_c = _sql3.connect(SCHEDULED_DB)
placeholders = ",".join("?" * len(ids))
owner_clause, owner_params = _email_cache_owner_clause(owner)
rows = _c.execute(
f"SELECT message_id, summary FROM email_summaries WHERE message_id IN ({placeholders})",
ids,
f"SELECT message_id, summary FROM email_summaries "
f"WHERE message_id IN ({placeholders}) AND {owner_clause}",
(*ids, *owner_params),
).fetchall()
_c.close()
by_id = {r[0]: r[1] for r in rows}
@@ -1219,15 +1221,16 @@ def setup_email_routes():
try:
import sqlite3 as _sql3
_c = _sql3.connect(SCHEDULED_DB)
owner_clause, owner_params = _email_cache_owner_clause(owner)
_row = _c.execute(
"SELECT summary FROM email_summaries WHERE message_id = ?",
(message_id.strip(),),
f"SELECT summary FROM email_summaries WHERE message_id = ? AND {owner_clause}",
(message_id.strip(), *owner_params),
).fetchone()
if _row:
cached_summary = _row[0]
_row2 = _c.execute(
"SELECT reply FROM email_ai_replies WHERE message_id = ?",
(message_id.strip(),),
f"SELECT reply FROM email_ai_replies WHERE message_id = ? AND {owner_clause}",
(message_id.strip(), *owner_params),
).fetchone()
if _row2:
cached_ai_reply = _apply_email_style_mechanics(_extract_reply(_row2[0] or ""))
@@ -2549,10 +2552,10 @@ def setup_email_routes():
_c = _sql3.connect(SCHEDULED_DB)
_c.execute("""
INSERT OR REPLACE INTO email_summaries
(message_id, uid, folder, subject, sender, summary, model_used, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
(message_id, owner, uid, folder, subject, sender, summary, model_used, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
mid, data.get("uid", ""), data.get("folder", ""),
mid, owner, data.get("uid", ""), data.get("folder", ""),
subject, sender, content, model, datetime.utcnow().isoformat(),
))
_c.commit()
@@ -2587,9 +2590,10 @@ def setup_email_routes():
if message_id:
try:
_c = _sql3.connect(SCHEDULED_DB)
owner_clause, owner_params = _email_cache_owner_clause(owner)
_row = _c.execute(
"SELECT reply, model_used FROM email_ai_replies WHERE message_id = ?",
(message_id,),
f"SELECT reply, model_used FROM email_ai_replies WHERE message_id = ? AND {owner_clause}",
(message_id, *owner_params),
).fetchone()
_c.close()
if _row and _row[0]:
@@ -2791,9 +2795,9 @@ def setup_email_routes():
_c = _sql3.connect(SCHEDULED_DB)
_c.execute("""
INSERT OR REPLACE INTO email_ai_replies
(message_id, uid, folder, reply, model_used, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""", (message_id, source_uid, source_folder, reply, model, datetime.utcnow().isoformat()))
(message_id, owner, uid, folder, reply, model_used, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (message_id, owner, source_uid, source_folder, reply, model, datetime.utcnow().isoformat()))
_c.commit()
_c.close()
except Exception as e:

View File

@@ -455,7 +455,7 @@ def setup_task_routes(task_scheduler) -> APIRouter:
import sqlite3
from pathlib import Path
from routes.email_helpers import SCHEDULED_DB
from routes.email_helpers import SCHEDULED_DB, OWNER_SCOPED_EMAIL_CACHE_TABLES, _email_cache_owner_clause
cleared = {}
conn = sqlite3.connect(SCHEDULED_DB)
@@ -468,6 +468,13 @@ def setup_task_routes(task_scheduler) -> APIRouter:
(user,),
).fetchone()[0]
conn.execute("DELETE FROM email_tags WHERE owner = ? OR owner = ''", (user,))
elif table in OWNER_SCOPED_EMAIL_CACHE_TABLES and user:
owner_clause, owner_params = _email_cache_owner_clause(user)
before = conn.execute(
f"SELECT COUNT(*) FROM {table} WHERE {owner_clause}",
owner_params,
).fetchone()[0]
conn.execute(f"DELETE FROM {table} WHERE {owner_clause}", owner_params)
else:
before = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0]
conn.execute(f"DELETE FROM {table}")

View File

@@ -43,6 +43,129 @@ def test_email_tag_clause_keeps_legacy_rows_for_single_user_mode(monkeypatch):
assert params == [""]
def test_email_ai_cache_tables_are_owner_scoped_and_migrate_legacy_rows(tmp_path, monkeypatch):
import routes.email_helpers as email_helpers
db_path = tmp_path / "scheduled_emails.db"
monkeypatch.setattr(email_helpers, "SCHEDULED_DB", db_path)
conn = sqlite3.connect(db_path)
conn.execute(
"""
CREATE TABLE email_summaries (
message_id TEXT PRIMARY KEY,
uid TEXT,
folder TEXT,
subject TEXT,
sender TEXT,
summary TEXT NOT NULL,
model_used TEXT,
created_at TEXT NOT NULL
)
"""
)
conn.execute(
"""
INSERT INTO email_summaries
(message_id, uid, folder, subject, sender, summary, model_used, created_at)
VALUES ('<shared@example.com>', '1', 'INBOX', 'Subject', 'a@example.com', 'legacy', 'm', '2026-01-01')
"""
)
conn.commit()
conn.close()
email_helpers._init_scheduled_db()
conn = sqlite3.connect(db_path)
try:
for table in (
"email_summaries",
"email_ai_replies",
"email_calendar_extractions",
"email_urgency_alerts",
):
info = conn.execute(f"PRAGMA table_info({table})").fetchall()
pk_cols = [r[1] for r in sorted((r for r in info if r[5]), key=lambda r: r[5])]
assert pk_cols == ["message_id", "owner"]
assert conn.execute(
"SELECT owner, summary FROM email_summaries WHERE message_id=?",
("<shared@example.com>",),
).fetchone() == ("", "legacy")
conn.execute(
"""
INSERT INTO email_summaries
(message_id, owner, uid, folder, subject, sender, summary, model_used, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
("<shared@example.com>", "alice", "2", "INBOX", "Subject", "a@example.com", "alice", "m", "2026-01-02"),
)
conn.execute(
"""
INSERT INTO email_summaries
(message_id, owner, uid, folder, subject, sender, summary, model_used, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
("<shared@example.com>", "bob", "3", "INBOX", "Subject", "a@example.com", "bob", "m", "2026-01-03"),
)
rows = conn.execute(
"SELECT owner, summary FROM email_summaries WHERE message_id=? ORDER BY owner",
("<shared@example.com>",),
).fetchall()
assert rows == [("", "legacy"), ("alice", "alice"), ("bob", "bob")]
finally:
conn.close()
@pytest.mark.asyncio
async def test_ai_reply_cache_lookup_is_owner_scoped(tmp_path, monkeypatch):
import routes.email_helpers as email_helpers
import routes.email_routes as email_routes
db_path = tmp_path / "scheduled_emails.db"
monkeypatch.setattr(email_helpers, "SCHEDULED_DB", db_path)
monkeypatch.setattr(email_routes, "SCHEDULED_DB", db_path)
email_helpers._init_scheduled_db()
conn = sqlite3.connect(db_path)
conn.execute(
"""
INSERT INTO email_ai_replies
(message_id, owner, uid, folder, reply, model_used, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
("<shared@example.com>", "alice", "1", "INBOX", "alice private draft", "m-a", "2026-01-01"),
)
conn.execute(
"""
INSERT INTO email_ai_replies
(message_id, owner, uid, folder, reply, model_used, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
("<shared@example.com>", "bob", "2", "INBOX", "bob private draft", "m-b", "2026-01-02"),
)
conn.commit()
conn.close()
router = email_routes.setup_email_routes()
ai_reply = _route_endpoint(router, "/api/email/ai-reply", "POST")
result = await ai_reply(
{
"to": "sender@example.com",
"subject": "Subject",
"original_body": "Body",
"message_id": "<shared@example.com>",
},
owner="bob",
)
assert result["success"] is True
assert result["cached"] is True
assert result["reply"] == "bob private draft"
assert result["model_used"] == "m-b"
@pytest.mark.asyncio
async def test_scheduled_email_routes_are_owner_scoped(tmp_path, monkeypatch):
import routes.email_helpers as email_helpers